我有一个返回期货列表的方法

List<Future<O>> futures = getFutures();

现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。

简单的方法就是

wait() {

   For(Future f : futures) {
     try {
       f.get();
     } catch(Exception e) {
       //TODO catch specific exception
       // this future threw exception , means somone could not do its task
       return;
     }
   }
}

但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。

如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说

boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.

当前回答

您可以使用ExecutorCompletionService。文档甚至为您的确切用例提供了一个示例:

相反,假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备就绪时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
    CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    int n = solvers.size();
    List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
    Result result = null;
    try {
        for (Callable<Result> s : solvers)
            futures.add(ecs.submit(s));
        for (int i = 0; i < n; ++i) {
            try {
                Result r = ecs.take().get();
                if (r != null) {
                    result = r;
                    break;
                }
            } catch (ExecutionException ignore) {
            }
        }
    } finally {
        for (Future<Result> f : futures)
            f.cancel(true);
    }

    if (result != null)
        use(result);
}

这里需要注意的重要一点是,ec .take()将获得第一个完成的任务,而不仅仅是第一个提交的任务。因此,您应该按照完成执行(或抛出异常)的顺序获取它们。

其他回答

如果你正在使用Java 8并且不想操纵CompletableFutures,我已经编写了一个工具来检索List<Future<T>>使用流的结果。关键是禁止在它抛出时映射(Future::get)。

public final class Futures
{

    private Futures()
    {}

    public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
    {
        return new FutureCollector<>();
    }

    private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
    {
        private final List<Throwable> exceptions = new LinkedList<>();

        @Override
        public Supplier<Collection<T>> supplier()
        {
            return LinkedList::new;
        }

        @Override
        public BiConsumer<Collection<T>, Future<T>> accumulator()
        {
            return (r, f) -> {
                try
                {
                    r.add(f.get());
                }
                catch (InterruptedException e)
                {}
                catch (ExecutionException e)
                {
                    exceptions.add(e.getCause());
                }
            };
        }

        @Override
        public BinaryOperator<Collection<T>> combiner()
        {
            return (l1, l2) -> {
                l1.addAll(l2);
                return l1;
            };
        }

        @Override
        public Function<Collection<T>, List<T>> finisher()
        {
            return l -> {

                List<T> ret = new ArrayList<>(l);
                if (!exceptions.isEmpty())
                    throw new AggregateException(exceptions, ret);

                return ret;
            };

        }

        @Override
        public Set<java.util.stream.Collector.Characteristics> characteristics()
        {
            return java.util.Collections.emptySet();
        }
    }

这需要一个像c#一样工作的AggregateException

public class AggregateException extends RuntimeException
{
    /**
     *
     */
    private static final long serialVersionUID = -4477649337710077094L;

    private final List<Throwable> causes;
    private List<?> successfulElements;

    public AggregateException(List<Throwable> causes, List<?> l)
    {
        this.causes = causes;
        successfulElements = l;
    }

    public AggregateException(List<Throwable> causes)
    {
        this.causes = causes;
    }

    @Override
    public synchronized Throwable getCause()
    {
        return this;
    }

    public List<Throwable> getCauses()
    {
        return causes;
    }

    public List<?> getSuccessfulElements()
    {
        return successfulElements;
    }

    public void setSuccessfulElements(List<?> successfulElements)
    {
        this.successfulElements = successfulElements;
    }

}

该组件的作用与c#的Task.WaitAll完全相同。我正在开发一个与CompletableFuture相同的变体。allOf(相当于Task.WhenAll)

我这样做的原因是我正在使用Spring的ListenableFuture,并不想移植到CompletableFuture,尽管这是一种更标准的方式

如果你使用的是Java 8,那么你可以通过CompletableFuture和CompletableFuture更容易地做到这一点。allOf,它只在所有提供的CompletableFutures完成后应用回调。

// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.

public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
    CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);

    return CompletableFuture.allOf(cfs)
            .thenApply(ignored -> futures.stream()
                                    .map(CompletableFuture::join)
                                    .collect(Collectors.toList())
            );
}

这就是我用来在未来列表上等待特定时间的方法。我认为它更干净。

CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
// Some parallel work
        for (Something tp : somethings) {
            completionService.submit(() -> {
                try {
                   work(something)
                } catch (ConnectException e) {
                } finally {
                    countDownLatch.countDown();
                }
            });
        }    
  try {
        if (!countDownLatch.await(secondsToWait, TimeUnit.SECONDS)){
        }
    } catch (InterruptedException e) {
    }

在Java 8中使用CompletableFuture

    // Kick of multiple, asynchronous lookups
    CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
    CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
    CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");

    // Wait until they are all done
    CompletableFuture.allOf(page1,page2,page3).join();

    logger.info("--> " + page1.get());

也许这会有帮助(没有什么会被原始的线取代,耶!) 我建议用一个分开的线程运行每个Future家伙(它们并行),然后当其中一个得到错误时,它只是信号管理器(Handler类)。

class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
  thisThread=Thread.currentThread();
  List<Future<Object>> futures = getFutures();
  trds=new Thread[futures.size()];
  for (int i = 0; i < trds.length; i++) {
    RunTask rt=new RunTask(futures.get(i), this);
    trds[i]=new Thread(rt);
  }
  synchronized (this) {
    for(Thread tx:trds){
      tx.start();
    }  
  }
  for(Thread tx:trds){
    try {tx.join();
    } catch (InterruptedException e) {
      System.out.println("Job failed!");break;
    }
  }if(!failed){System.out.println("Job Done");}
}

private List<Future<Object>> getFutures() {
  return null;
}

public synchronized void cancelOther(){if(failed){return;}
  failed=true;
  for(Thread tx:trds){
    tx.stop();//Deprecated but works here like a boss
  }thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}

我不得不说上面的代码会出错(没有检查),但我希望我可以解释解决方案。请试一试。