我有一个返回期货列表的方法
List<Future<O>> futures = getFutures();
现在,我想要等待,直到所有的future都成功地完成处理,或者由future返回输出的任何任务抛出异常。即使一个任务抛出异常,等待其他未来也没有意义。
简单的方法就是
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
但这里的问题是,例如,如果第4个future抛出异常,那么我将不必要地等待前3个future可用。
如何解决这个问题?倒计时插销有什么帮助吗?我无法使用未来的isDone,因为java文档说
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
您可以使用ExecutorCompletionService。文档甚至为您的确切用例提供了一个示例:
相反,假设你想使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备就绪时取消所有其他任务:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
这里需要注意的重要一点是,ec .take()将获得第一个完成的任务,而不仅仅是第一个提交的任务。因此,您应该按照完成执行(或抛出异常)的顺序获取它们。
如果你正在使用Java 8并且不想操纵CompletableFutures,我已经编写了一个工具来检索List<Future<T>>使用流的结果。关键是禁止在它抛出时映射(Future::get)。
public final class Futures
{
private Futures()
{}
public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}
private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();
@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}
@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}
@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {
List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);
return ret;
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}
这需要一个像c#一样工作的AggregateException
public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;
private final List<Throwable> causes;
private List<?> successfulElements;
public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}
public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}
@Override
public synchronized Throwable getCause()
{
return this;
}
public List<Throwable> getCauses()
{
return causes;
}
public List<?> getSuccessfulElements()
{
return successfulElements;
}
public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}
}
该组件的作用与c#的Task.WaitAll完全相同。我正在开发一个与CompletableFuture相同的变体。allOf(相当于Task.WhenAll)
我这样做的原因是我正在使用Spring的ListenableFuture,并不想移植到CompletableFuture,尽管这是一种更标准的方式
也许这会有帮助(没有什么会被原始的线取代,耶!)
我建议用一个分开的线程运行每个Future家伙(它们并行),然后当其中一个得到错误时,它只是信号管理器(Handler类)。
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
我不得不说上面的代码会出错(没有检查),但我希望我可以解释解决方案。请试一试。