我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
    taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。


基本上在ExecutorService上调用shutdown(),然后调用awaitterminate ():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
  taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
  ...
}

使用CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
  taskExecutor.execute(new MyTask());
}

try {
  latch.await();
} catch (InterruptedException E) {
   // handle
}

在任务中(在try / finally中附上)

latch.countDown();

你可以把你的任务包装在另一个可运行文件中,它会发送通知:

taskExecutor.execute(new Runnable() {
  public void run() {
    taskStartedNotification();
    new MyTask().run();
    taskFinishedNotification();
  }
});

您可以使用自己的ExecutorCompletionService子类来包装taskExecutor,并使用自己的BlockingQueue实现在每个任务完成时获得通知,并在完成的任务数量达到预期目标时执行任何回调或其他操作。

ExecutorService.invokeAll()为您完成。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Java 5及以后版本中的CyclicBarrier类就是为这类事情而设计的。

这只是我的个人意见。 为了克服CountDownLatch预先知道任务数量的要求,您可以使用简单的Semaphore来使用旧的方式。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
    taskExecutor.execute(new MyTask());
    numberOfTasks++;
}

try {
    s.aquire(numberOfTasks);
...

在任务中调用s.release()就像调用latch.countDown()一样;

你也可以使用期货列表:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
  public Void call() throws IOException {
     // do something
    return null;
  }
}));

然后当你想要连接所有线程时,它本质上等同于连接每个线程,(额外的好处是它会从子线程重新引发异常到主线程):

for(Future f: this.futures) { f.get(); }

Basically the trick is to call .get() on each Future one at a time, instead of infinite looping calling isDone() on (all or each). So you're guaranteed to "move on" through and past this block as soon as the last thread finishes. The caveat is that since the .get() call re-raises exceptions, if one of the threads dies, you would raise from this possibly before the other threads have finished to completion [to avoid this, you could add a catch ExecutionException around the get call]. The other caveat is it keeps a reference to all threads so if they have thread local variables they won't get collected till after you get past this block (though you might be able to get around this, if it became a problem, by removing Future's off the ArrayList). If you wanted to know which Future "finishes first" you could use some something like https://stackoverflow.com/a/31885029/32453

我刚刚写了一个示例程序来解决你的问题。这里没有给出简洁的实现,所以我将添加一个。虽然您可以使用executor.shutdown()和executor. awaitterminate(),但这不是最佳实践,因为不同线程所花费的时间是不可预测的。

ExecutorService es = Executors.newCachedThreadPool();
    List<Callable<Integer>> tasks = new ArrayList<>();

    for (int j = 1; j <= 10; j++) {
        tasks.add(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                int sum = 0;
                System.out.println("Starting Thread "
                        + Thread.currentThread().getId());

                for (int i = 0; i < 1000000; i++) {
                    sum += i;
                }

                System.out.println("Stopping Thread "
                        + Thread.currentThread().getId());
                return sum;
            }

        });
    }

    try {
        List<Future<Integer>> futures = es.invokeAll(tasks);
        int flag = 0;

        for (Future<Integer> f : futures) {
            Integer res = f.get();
            System.out.println("Sum: " + res);
            if (!f.isDone()) 
                flag = 1;
        }

        if (flag == 0)
            System.out.println("SUCCESS");
        else
            System.out.println("FAILED");

    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

在执行器getActiveCount() -中有一个方法可以给出活动线程的计数。

在跨越线程之后,我们可以检查activeCount()值是否为0。一旦该值为零,就意味着当前没有活动线程在运行,这意味着任务已经完成:

while (true) {
    if (executor.getActiveCount() == 0) {
    //ur own piece of code
    break;
    }
}

有点晚了,但为了完成…

不要“等待”所有的任务都完成,你可以用好莱坞的原则来思考,“不要给我打电话,我会给你打电话”——当我完成的时候。 我认为结果代码更优雅…

番石榴提供了一些有趣的工具来实现这一点。

一个例子:

将ExecutorService包装成ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交一个可调用对象的集合来执行::

for (Callable<Integer> callable : callables) {
  ListenableFuture<Integer> lf = service.submit(callable);
  // listenableFutures is a collection
  listenableFutures.add(lf)
});

现在最重要的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

附加一个回调到ListenableFuture,当所有future完成时,你可以使用它来得到通知:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
    @Override
    public void onSuccess(List<Integer> result) {
        // do something with all the results
    }

    @Override
    public void onFailure(Throwable t) {
        // log failure
    }
});

这也提供了一个好处,一旦处理完成,您就可以在一个地方收集所有的结果……

更多信息请点击这里

这可能会有所帮助

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
                try {
                    Log.i(LOG_TAG, "Waiting for executor to terminate...");
                    if (executor.isTerminated())
                        break;
                    if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                        break;
                    }
                } catch (InterruptedException ignored) {}
            }

只是在这里提供更多不同于使用闩锁/屏障的选择。 你也可以在它们全部使用完CompletionService之前得到部分结果。

来自Java并发实践: “如果您有一批计算要提交给Executor,并且您希望检索它们的结果 可用时,您可以保留与每个任务关联的Future,并通过调用get来重复轮询完成 超时为0。这是可能的,但很乏味。幸运的是,还有更好的方法:完井服务。”

这里是实现

public class TaskSubmiter {
    private final ExecutorService executor;
    TaskSubmiter(ExecutorService executor) { this.executor = executor; }
    void doSomethingLarge(AnySourceClass source) {
        final List<InterestedResult> info = doPartialAsyncProcess(source);
        CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
        for (final InterestedResult interestedResultItem : info)
            completionService.submit(new Callable<PartialResult>() {
                public PartialResult call() {
                    return InterestedResult.doAnOperationToGetPartialResult();
                }
        });

    try {
        for (int t = 0, n = info.size(); t < n; t++) {
            Future<PartialResult> f = completionService.take();
            PartialResult PartialResult = f.get();
            processThisSegment(PartialResult);
            }
        } 
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } 
        catch (ExecutionException e) {
            throw somethinghrowable(e.getCause());
        }
    }
}

遵循以下方法之一。

遍历从ExecutorService上的submit返回的所有Future任务,并按照Kiran的建议使用Future对象上的阻塞调用get()检查状态 在ExecutorService上使用invokeAll() CountDownLatch ForkJoinPool或Executors.html#newWorkStealingPool 按照适当的顺序使用ThreadPoolExecutor的shutdown、awaitterminate、shutdownNow api

相关的SE问题:

CountDownLatch是如何在Java多线程中使用的?

如何正确关闭java ExecutorService

你可以在这个Runner类上调用waitTillDone():

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool

while(...) {
    runner.run(new MyTask()); // here you submit your task
}


runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)


runner.shutdown(); // once you done you can shutdown the runner

在调用shutdown()之前,您可以重用这个类并多次调用waitTillDone(),而且您的代码非常简单。你也不需要预先知道任务的数量。

要使用它,只需将gradle/maven compile 'com.github.matejtymes:javafixes:1.3.1'依赖项添加到你的项目中。

详情请点击这里:

https://github.com/MatejTymes/JavaFixes

在Java8中,你可以用CompletableFuture来实现:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

你应该使用executorService.shutdown()和executorService。awaitTermination方法。

示例如下:

public class ScheduledThreadPoolExample {

    public static void main(String[] args) throws InterruptedException {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
                0, 1, TimeUnit.SECONDS);

        TimeUnit.SECONDS.sleep(10);
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }

}

我们可以使用流API来处理流。请参阅下面的片段

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool

//alternatively to specify parallelism 
new ForkJoinPool(15).submit(
          () -> tasks.stream().parallel().forEach(Runnable::run) 
    ).get();

你可以使用下面的代码:

public class MyTask implements Runnable {

    private CountDownLatch countDownLatch;

    public MyTask(CountDownLatch countDownLatch {
         this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
         try {
             //Do somethings
             //
             this.countDownLatch.countDown();//important
         } catch (InterruptedException ex) {
              Thread.currentThread().interrupt();
         }
     }
}

CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
     taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

所以我把我的答案贴在这里,以防有人想要一个更简单的方法来做到这一点

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
    futures[i++] =  CompletableFuture.runAsync(runner, executor);
}

CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

这是我的解决方案,基于“亚当·天行者”的技巧,它很有效

package frss.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class TestHilos {

    void procesar() {
        ExecutorService es = Executors.newFixedThreadPool(4);
        List<Runnable> tasks = getTasks();
        CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(futures).join();
        es.shutdown();

        System.out.println("FIN DEL PROCESO DE HILOS");
    }

    private List<Runnable> getTasks() {
        List<Runnable> tasks = new ArrayList<Runnable>();

        Hilo01 task1 = new Hilo01();
        tasks.add(task1);

        Hilo02 task2 = new Hilo02();
        tasks.add(task2);
        return tasks;
    }

    private class Hilo01 extends Thread {

        @Override
        public void run() {
            System.out.println("HILO 1");
        }

    }

    private class Hilo02 extends Thread {

        @Override
        public void run() {
            try {
                sleep(2000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("HILO 2");
        }

    }


    public static void main(String[] args) {
        TestHilos test = new TestHilos();
        test.procesar();
    }
}

这里有两个选择,只是有点困惑,哪个是最好的。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
                               .map(task -> CompletableFuture.runAsync(task, es))
                               .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();    
es.shutdown();

选项2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
    futures.add(es.submit(task));
}

for(Future<?> future : futures) {
    try {
        future.get();
    }catch(Exception e){
        // do logging and nothing else
    }
}
es.shutdown();

这里放入future.get();试抓是个好主意,对吧?

我创建了以下工作示例。其思想是有一种方法来处理具有许多线程(由numberOfTasks/阈值通过编程确定)的任务池(我使用队列作为示例),并等待所有线程完成后继续进行其他处理。

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/** Testing CountDownLatch and ExecutorService to manage scenario where
 * multiple Threads work together to complete tasks from a single
 * resource provider, so the processing can be faster. */
public class ThreadCountDown {

private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();

public static void main(String[] args) {
    // Create a queue with "Tasks"
    int numberOfTasks = 2000;
    while(numberOfTasks-- > 0) {
        tasks.add(numberOfTasks);
    }

    // Initiate Processing of Tasks
    ThreadCountDown main = new ThreadCountDown();
    main.process(tasks);
}

/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
    int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
    threadsCountdown = new CountDownLatch(numberOfThreads);
    ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);

    //Initialize each Thread
    while(numberOfThreads-- > 0) {
        System.out.println("Initializing Thread: "+numberOfThreads);
        threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
    }

    try {
        //Shutdown the Executor, so it cannot receive more Threads.
        threadExecutor.shutdown();
        threadsCountdown.await();
        System.out.println("ALL THREADS COMPLETED!");
        //continue With Some Other Process Here
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
}

/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
    int threshold = 100;
    int threads = size / threshold;
    if( size > (threads*threshold) ){
        threads++;
    }
    return threads;
}

/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
    return tasks.poll();
}

/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {

    private String threadName;

    protected MyThread(String threadName) {
        super();
        this.threadName = threadName;
    }

    @Override
    public void run() {
        Integer task;
        try{
            //Check in the Task pool if anything pending to process
            while( (task = getTask()) != null ){
                processTask(task);
            }
        }catch (Exception ex){
            ex.printStackTrace();
        }finally {
            /*Reduce count when no more tasks to process. Eventually all
            Threads will end-up here, reducing the count to 0, allowing
            the flow to continue after threadsCountdown.await(); */
            threadsCountdown.countDown();
        }
    }

    private void processTask(Integer task){
        try{
            System.out.println(this.threadName+" is Working on Task: "+ task);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}
}

希望能有所帮助!


ExecutorService WORKER_THREAD_POOL 
  = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
    WORKER_THREAD_POOL.submit(() -> {
        try {
            // doSomething();
            latch.countDown();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// wait for the latch to be decremented by the two remaining threads
latch.await();

如果doSomething()抛出一些其他异常,latch.countDown()似乎不会执行,那么我该怎么办?

如果你连续使用了更多的ExecutionServices线程,并且想要等待每个EXECUTIONSERVICE完成。最好的办法是像下面这样;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
   executer1.execute(new Runnable() {
            @Override
            public void run() {
                ...
            }
        });
} 
executer1.shutdown();

try{
   executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

   ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
   for (true) {
      executer2.execute(new Runnable() {
            @Override
            public void run() {
                 ...
            }
        });
   } 
   executer2.shutdown();
} catch (Exception e){
 ...
}

使用ExecutorService的干净方式

 List<Future<Void>> results = null;
 try {
     List<Callable<Void>> tasks = new ArrayList<>();
     ExecutorService executorService = Executors.newFixedThreadPool(4);
     results = executorService.invokeAll(tasks);
 } catch (InterruptedException ex) {
     ...
 } catch (Exception ex) {
     ...
 }

Project Loom的AutoCloseable执行器服务上的Try-with-Resources语法

Project Loom试图为Java中的并发能力添加新特性。

其中一个特性是使ExecutorService可自动关闭。这意味着每个ExecutorService实现都将提供一个close方法。这意味着我们可以使用try-with-resources语法自动关闭ExecutorService对象。

executorservice# close方法将阻塞,直到所有提交的任务都完成。使用close代替了调用shutdown & awaitterminate。

自动关闭功能有助于Project Loom将“结构化并发”引入Java。

try (
    ExecutorService executorService = Executors.… ;
) {
    // Submit your `Runnable`/`Callable` tasks to the executor service.
    …
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

有关Project Loom的更多信息,请搜索由Ron Pressler和Project Loom团队的其他人所做的演讲和采访。关注更近期的,如Project Loom的发展。

Project Loom技术的实验版本现在已经可以使用了,它是基于早期的Java 18。