等待ExecutorService所有任务完成的最简单方法是什么?我的任务主要是计算性的,所以我只想运行大量的作业——每个核心上都有一个。现在我的设置是这样的:

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {   
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
} 
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask实现了runnable。这似乎正确地执行了任务,但代码在wait()时崩溃,并出现IllegalMonitorStateException。这是奇怪的,因为我玩了一些玩具的例子,它似乎工作。

uniquePhrases包含数万个元素。我应该用另一种方法吗?我在寻找一些尽可能简单的东西


当前回答

我将等待执行程序以您认为适合任务完成的指定超时终止。

 try {  
         //do stuff here 
         exe.execute(thread);
    } finally {
        exe.shutdown();
    }
    boolean result = exe.awaitTermination(4, TimeUnit.HOURS);
    if (!result)

    {
        LOGGER.error("It took more than 4 hour for the executor to stop, this shouldn't be the normal behaviour.");
    }

其他回答

你可以在一定的时间间隔内等待任务完成:

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);    
}

或者您可以使用ExecutorService.submit(Runnable)并收集它返回的Future对象,然后依次对每个对象调用get()以等待它们完成。

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

正确处理InterruptedException非常重要。它可以让您或库的用户安全地结束一个漫长的进程。

IllegalMonitorStateException的根本原因:

抛出该异常,表示线程试图等待对象的监视器,或通知其他线程等待对象的监视器而不拥有指定的监视器。

在代码中,您刚刚在ExecutorService上调用了wait(),但没有拥有锁。

下面的代码将修复IllegalMonitorStateException

try 
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

遵循以下方法中的一种来等待所有已提交给ExecutorService的任务的完成。

Iterate through all Future tasks from submit on ExecutorService and check the status with blocking call get() on Future object Using invokeAll on ExecutorService Using CountDownLatch Using ForkJoinPool or newWorkStealingPool of Executors(since java 8) Shutdown the pool as recommended in oracle documentation page void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } If you want to gracefully wait for all tasks for completion when you are using option 5 instead of options 1 to 4, change if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { to a while(condition) which checks for every 1 minute.

我还有一种情况,我有一组文档要爬取。我从一个应该被处理的初始“种子”文档开始,该文档包含到其他也应该被处理的文档的链接,等等。

在我的主程序中,我只想写如下所示的东西,其中Crawler控制了一堆线程。

Crawler c = new Crawler();
c.schedule(seedDocument); 
c.waitUntilCompletion()

如果我想在一棵树上导航,同样的情况也会发生;我将插入根节点,每个节点的处理器将根据需要向队列中添加子节点,一堆线程将处理树中的所有节点,直到没有更多的节点为止。

我在JVM中找不到任何东西,我觉得这有点令人惊讶。所以我写了一个类ThreadPool,一个可以直接使用或子类添加适合域的方法,例如schedule(Document)。希望能有所帮助!

ThreadPool Javadoc | Maven

在集合中添加所有线程并使用invokeAll提交。 如果您可以使用ExecutorService的invokeAll方法,那么JVM在所有线程完成之前不会进行下一行。

这里有一个很好的例子: 通过ExecutorService调用所有

您可以使用ExecutorService。它将执行所有任务并等待所有线程完成它们的任务。

这是完整的javadoc

您还可以使用此方法的重载版本来指定超时时间。

下面是ExecutorService.invokeAll的示例代码

public class Test {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService service = Executors.newFixedThreadPool(3);
        List<Callable<String>> taskList = new ArrayList<>();
        taskList.add(new Task1());
        taskList.add(new Task2());
        List<Future<String>> results = service.invokeAll(taskList);
        for (Future<String> f : results) {
            System.out.println(f.get());
        }
    }

}

class Task1 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(2000);
            return "Task 1 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task1";
        }
    }
}

class Task2 implements Callable<String> {
    @Override
    public String call() throws Exception {
        try {
            Thread.sleep(3000);
            return "Task 2 done";
        } catch (Exception e) {
            e.printStackTrace();
            return " error in task2";
        }
    }
}