我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。


当前回答

有点晚了,但为了完成…

不要“等待”所有的任务都完成,你可以用好莱坞的原则来思考,“不要给我打电话,我会给你打电话”——当我完成的时候。 我认为结果代码更优雅…

番石榴提供了一些有趣的工具来实现这一点。

一个例子:

将ExecutorService包装成ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交一个可调用对象的集合来执行::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

现在最重要的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

附加一个回调到ListenableFuture,当所有future完成时,你可以使用它来得到通知:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

这也提供了一个好处,一旦处理完成,您就可以在一个地方收集所有的结果……

更多信息请点击这里

其他回答

我创建了以下工作示例。其思想是有一种方法来处理具有许多线程(由numberOfTasks/阈值通过编程确定)的任务池(我使用队列作为示例),并等待所有线程完成后继续进行其他处理。

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

希望能有所帮助!

Project Loom的AutoCloseable执行器服务上的Try-with-Resources语法

Project Loom试图为Java中的并发能力添加新特性。

其中一个特性是使ExecutorService可自动关闭。这意味着每个ExecutorService实现都将提供一个close方法。这意味着我们可以使用try-with-resources语法自动关闭ExecutorService对象。

executorservice# close方法将阻塞,直到所有提交的任务都完成。使用close代替了调用shutdown & awaitterminate。

自动关闭功能有助于Project Loom将“结构化并发”引入Java。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

有关Project Loom的更多信息,请搜索由Ron Pressler和Project Loom团队的其他人所做的演讲和采访。关注更近期的,如Project Loom的发展。

Project Loom技术的实验版本现在已经可以使用了,它是基于早期的Java 18。

你可以在这个Runner类上调用waitTillDone():

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

在调用shutdown()之前,您可以重用这个类并多次调用waitTillDone(),而且您的代码非常简单。你也不需要预先知道任务的数量。

要使用它,只需将gradle/maven compile 'com.github.matejtymes:javafixes:1.3.1'依赖项添加到你的项目中。

详情请点击这里:

https://github.com/MatejTymes/JavaFixes


ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

如果doSomething()抛出一些其他异常,latch.countDown()似乎不会执行,那么我该怎么办?

你也可以使用期货列表:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

然后当你想要连接所有线程时,它本质上等同于连接每个线程,(额外的好处是它会从子线程重新引发异常到主线程):

for(Future f: this.futures) { f.get(); }

Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://stackoverflow.com/a/31885029/32453