我在尝试Java 8的Lambda表达式时有一个问题。 通常它工作得很好,但现在我有了抛出IOException的方法。 最好看看下面的代码:

class Bank{
    ....
    public Set<String> getActiveAccountNumbers() throws IOException {
        Stream<Account> s =  accounts.values().stream();
        s = s.filter(a -> a.isActive());
        Stream<String> ss = s.map(a -> a.getNumber());
        return ss.collect(Collectors.toSet());
    }
    ....
}

interface Account{
    ....
    boolean isActive() throws IOException;
    String getNumber() throws IOException;
    ....
}

问题是,它不能编译,因为我必须捕获isActive-和getNumber-Methods的可能异常。但是,即使我显式地使用如下所示的try-catch-Block,它仍然不能编译,因为我没有捕获异常。所以,要么是JDK有bug,要么是我不知道如何捕捉这些异常。

class Bank{
    ....
    //Doesn't compile either
    public Set<String> getActiveAccountNumbers() throws IOException {
        try{
            Stream<Account> s =  accounts.values().stream();
            s = s.filter(a -> a.isActive());
            Stream<String> ss = s.map(a -> a.getNumber());
            return ss.collect(Collectors.toSet());
        }catch(IOException ex){
        }
    }
    ....
}

我怎样才能让它工作呢?谁能给我点提示吗?


当前回答

你的例子可以写成:

import utils.stream.Unthrow;

class Bank{
   ....
   public Set<String> getActiveAccountNumbers() {
       return accounts.values().stream()
           .filter(a -> Unthrow.wrap(() -> a.isActive()))
           .map(a -> Unthrow.wrap(() -> a.getNumber()))
           .collect(Collectors.toSet());
   }
   ....
}

Unthrow类可以在这里https://github.com/SeregaLBN/StreamUnthrower

其他回答

也可以使用一些外部(流)错误指示器在更高级别抛出异常:

List<String> errorMessages = new ArrayList<>(); // error indicator
//..
errorMessages.clear();

List<String> names = new ArrayList<>(Arrays.asList("andrey", "angela", "pamela"));

names.stream()
.map(name -> {
    if (name != "pamela") {
      errorMessages.add(name + " is wrong here!"); 
      return null; // triggering the indicator
    }
    return name;
} )
.filter(elem -> (elem != null)) // bypassing propagation of only current unwanted data
//.filter(elem -> (errorMessages.size() == 0)) // or blocking any propagation once unwanted data detected
.forEach(System.out::println);

if (errorMessages.size() > 0) { // handling the indicator
  throw  new RuntimeException(String,join(", ", errorMessages));
}

使用#propagate()方法。来自Sam Beran的Java 8 Blog的非guava实现示例:

public class Throwables {
    public interface ExceptionWrapper<E> {
        E wrap(Exception e);
    }

    public static <T> T propagate(Callable<T> callable) throws RuntimeException {
        return propagate(callable, RuntimeException::new);
    }

    public static <T, E extends Throwable> T propagate(Callable<T> callable, ExceptionWrapper<E> wrapper) throws E {
        try {
            return callable.call();
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e) {
            throw wrapper.wrap(e);
        }
    }
}

你可以通过包装你的lambda来抛出一个未检查的异常,然后在终端操作中解开这个未检查的异常,从而滚动你自己的Stream变体:

@FunctionalInterface
public interface ThrowingPredicate<T, X extends Throwable> {
    public boolean test(T t) throws X;
}

@FunctionalInterface
public interface ThrowingFunction<T, R, X extends Throwable> {
    public R apply(T t) throws X;
}

@FunctionalInterface
public interface ThrowingSupplier<R, X extends Throwable> {
    public R get() throws X;
}

public interface ThrowingStream<T, X extends Throwable> {
    public ThrowingStream<T, X> filter(
            ThrowingPredicate<? super T, ? extends X> predicate);

    public <R> ThrowingStream<T, R> map(
            ThrowingFunction<? super T, ? extends R, ? extends X> mapper);

    public <A, R> R collect(Collector<? super T, A, R> collector) throws X;

    // etc
}

class StreamAdapter<T, X extends Throwable> implements ThrowingStream<T, X> {
    private static class AdapterException extends RuntimeException {
        public AdapterException(Throwable cause) {
            super(cause);
        }
    }

    private final Stream<T> delegate;
    private final Class<X> x;

    StreamAdapter(Stream<T> delegate, Class<X> x) {
        this.delegate = delegate;
        this.x = x;
    }

    private <R> R maskException(ThrowingSupplier<R, X> method) {
        try {
            return method.get();
        } catch (Throwable t) {
            if (x.isInstance(t)) {
                throw new AdapterException(t);
            } else {
                throw t;
            }
        }
    }

    @Override
    public ThrowingStream<T, X> filter(ThrowingPredicate<T, X> predicate) {
        return new StreamAdapter<>(
                delegate.filter(t -> maskException(() -> predicate.test(t))), x);
    }

    @Override
    public <R> ThrowingStream<R, X> map(ThrowingFunction<T, R, X> mapper) {
        return new StreamAdapter<>(
                delegate.map(t -> maskException(() -> mapper.apply(t))), x);
    }

    private <R> R unmaskException(Supplier<R> method) throws X {
        try {
            return method.get();
        } catch (AdapterException e) {
            throw x.cast(e.getCause());
        }
    }

    @Override
    public <A, R> R collect(Collector<T, A, R> collector) throws X {
        return unmaskException(() -> delegate.collect(collector));
    }
}

然后你可以像使用流一样使用它:

Stream<Account> s = accounts.values().stream();
ThrowingStream<Account, IOException> ts = new StreamAdapter<>(s, IOException.class);
return ts.filter(Account::isActive).map(Account::getNumber).collect(toSet());

这个解决方案需要相当多的样板文件,所以我建议您看一看我已经创建的库,它完全符合我在这里为整个Stream类(以及更多!)所描述的内容。

扩展@marcg解决方案,您通常可以在Streams中抛出和捕获检查异常;也就是说,编译器会要求你捕获/重新抛出,就像你在流外一样!

@FunctionalInterface
public interface Predicate_WithExceptions<T, E extends Exception> {
    boolean test(T t) throws E;
}

/**
 * .filter(rethrowPredicate(t -> t.isActive()))
 */
public static <T, E extends Exception> Predicate<T> rethrowPredicate(Predicate_WithExceptions<T, E> predicate) throws E {
    return t -> {
        try {
            return predicate.test(t);
        } catch (Exception exception) {
            return throwActualException(exception);
        }
    };
}

@SuppressWarnings("unchecked")
private static <T, E extends Exception> T throwActualException(Exception exception) throws E {
    throw (E) exception;
}

然后,您的示例将如下所示(添加测试以更清楚地显示它):

@Test
public void testPredicate() throws MyTestException {
    List<String> nonEmptyStrings = Stream.of("ciao", "")
            .filter(rethrowPredicate(s -> notEmpty(s)))
            .collect(toList());
    assertEquals(1, nonEmptyStrings.size());
    assertEquals("ciao", nonEmptyStrings.get(0));
}

private class MyTestException extends Exception { }

private boolean notEmpty(String value) throws MyTestException {
    if(value==null) {
        throw new MyTestException();
    }
    return !value.isEmpty();
}

@Test
public void testPredicateRaisingException() throws MyTestException {
    try {
        Stream.of("ciao", null)
                .filter(rethrowPredicate(s -> notEmpty(s)))
                .collect(toList());
        fail();
    } catch (MyTestException e) {
        //OK
    }
}

你的例子可以写成:

import utils.stream.Unthrow;

class Bank{
   ....
   public Set<String> getActiveAccountNumbers() {
       return accounts.values().stream()
           .filter(a -> Unthrow.wrap(() -> a.isActive()))
           .map(a -> Unthrow.wrap(() -> a.getNumber()))
           .collect(Collectors.toSet());
   }
   ....
}

Unthrow类可以在这里https://github.com/SeregaLBN/StreamUnthrower