我有一个返回期货列表的方法

List<Future<O>> futures = getFutures();

现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。

简单的方法就是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

当前回答

如果你使用的是Java 8,那么你可以通过CompletableFuture和CompletableFuture更容易地做到这一点。allOf,它只在所有提供的CompletableFutures完成后应用回调。

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

其他回答

如果你正在使用Java 8并且不想操纵CompletableFutures,我已经编写了一个工具来检索List<Future<T>>使用流的结果。关键是禁止在它抛出时映射(Future::get)。

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

这需要一个像c#一样工作的AggregateException

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

该组件的作用与c#的Task.WaitAll完全相同。我正在开发一个与CompletableFuture相同的变体。allOf(相当于Task.WhenAll)

我这样做的原因是我正在使用Spring的ListenableFuture,并不想移植到CompletableFuture,尽管这是一种更标准的方式

CompletionService将使用.submit()方法获取你的Callables,你可以使用.take()方法检索计算出的期货。

您一定不能忘记的一件事是通过调用.shutdown()方法来终止ExecutorService。此外,只有在保存了对执行器服务的引用时才能调用此方法,因此请确保保留一个引用。

示例代码-对于要并行处理的固定数量的工作项:

ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

CompletionService<YourCallableImplementor> completionService = 
new ExecutorCompletionService<YourCallableImplementor>(service);

ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();

for (String computeMe : elementsToCompute) {
    futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;

while(received < elementsToCompute.size()) {
 Future<YourCallableImplementor> resultFuture = completionService.take(); 
 YourCallableImplementor result = resultFuture.get();
 received ++;
}
//important: shutdown your ExecutorService
service.shutdown();

示例代码-对于要并行处理的动态数量的工作项:

public void runIt(){
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    CompletionService<CallableImplementor> completionService = new ExecutorCompletionService<CallableImplementor>(service);
    ArrayList<Future<CallableImplementor>> futures = new ArrayList<Future<CallableImplementor>>();

    //Initial workload is 8 threads
    for (int i = 0; i < 9; i++) {
        futures.add(completionService.submit(write.new CallableImplementor()));             
    }
    boolean finished = false;
    while (!finished) {
        try {
            Future<CallableImplementor> resultFuture;
            resultFuture = completionService.take();
            CallableImplementor result = resultFuture.get();
            finished = doSomethingWith(result.getResult());
            result.setResult(null);
            result = null;
            resultFuture = null;
            //After work package has been finished create new work package and add it to futures
            futures.add(completionService.submit(write.new CallableImplementor()));
        } catch (InterruptedException | ExecutionException e) {
            //handle interrupted and assert correct thread / work packet count              
        } 
    }

    //important: shutdown your ExecutorService
    service.shutdown();
}

public class CallableImplementor implements Callable{
    boolean result;

    @Override
    public CallableImplementor call() throws Exception {
        //business logic goes here
        return this;
    }

    public boolean getResult() {
        return result;
    }

    public void setResult(boolean result) {
        this.result = result;
    }
}

这就是我用来在未来列表上等待特定时间的方法。我认为它更干净。

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }

基于guava的解决方案可以使用Futures.FutureCombiner实现。

下面是在javadoc中给出的代码示例:

 final ListenableFuture<Instant> loginDateFuture =
     loginService.findLastLoginDate(username);
 final ListenableFuture<List<String>> recentCommandsFuture =
     recentCommandsService.findRecentCommands(username);
 ListenableFuture<UsageHistory> usageFuture =
     Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture)
         .call(
             () ->
                 new UsageHistory(
                     username,
                     Futures.getDone(loginDateFuture),
                     Futures.getDone(recentCommandsFuture)),
             executor);

要了解更多信息,请参阅用户指南的ListenableFutureExplained部分。

如果您对它的工作原理感到好奇,我建议查看源代码的这一部分:aggregatefuture .java#L127-L186

我有一个实用工具类,它包含这些:

@FunctionalInterface
public interface CheckedSupplier<X> {
  X get() throws Throwable;
}

public static <X> Supplier<X> uncheckedSupplier(final CheckedSupplier<X> supplier) {
    return () -> {
        try {
            return supplier.get();
        } catch (final Throwable checkedException) {
            throw new IllegalStateException(checkedException);
        }
    };
}

一旦你有了这些,使用静态导入,你可以像这样简单地等待所有的期货:

futures.stream().forEach(future -> uncheckedSupplier(future::get).get());

你也可以像这样收集他们的所有结果:

List<MyResultType> results = futures.stream()
    .map(future -> uncheckedSupplier(future::get).get())
    .collect(Collectors.toList());

只是重温我的旧帖子,注意到你有另一个悲伤:

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

在这种情况下,简单的解决方案是并行执行:

futures.stream().parallel()
 .forEach(future -> uncheckedSupplier(future::get).get());

这样,第一个异常虽然不会停止future,但会中断foreach语句,就像在串行示例中一样,但由于所有异常都是并行等待的,因此您不必等待前3个异常完成。