是否有Java 8流操作限制流(可能是无限的),直到第一个元素无法匹配谓词?

在Java 9中,我们可以像下面的例子一样使用takeWhile来打印所有小于10的数字。

IntStream
    .iterate(1, n -> n + 1)
    .takeWhile(n -> n < 10)
    .forEach(System.out::println);

因为在Java 8中没有这样的操作,那么以通用的方式实现它的最佳方法是什么呢?


当前回答

这是从JDK 9 java.util.stream.Stream.takeWhile(Predicate)中复制的源代码。为了使用JDK 8,有一点不同。

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> p) {
    class Taking extends Spliterators.AbstractSpliterator<T> implements Consumer<T> {
        private static final int CANCEL_CHECK_COUNT = 63;
        private final Spliterator<T> s;
        private int count;
        private T t;
        private final AtomicBoolean cancel = new AtomicBoolean();
        private boolean takeOrDrop = true;

        Taking(Spliterator<T> s) {
            super(s.estimateSize(), s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED));
            this.s = s;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            boolean test = true;
            if (takeOrDrop &&               // If can take
                    (count != 0 || !cancel.get()) && // and if not cancelled
                    s.tryAdvance(this) &&   // and if advanced one element
                    (test = p.test(t))) {   // and test on element passes
                action.accept(t);           // then accept element
                return true;
            } else {
                // Taking is finished
                takeOrDrop = false;
                // Cancel all further traversal and splitting operations
                // only if test of element failed (short-circuited)
                if (!test)
                    cancel.set(true);
                return false;
            }
        }

        @Override
        public Comparator<? super T> getComparator() {
            return s.getComparator();
        }

        @Override
        public void accept(T t) {
            count = (count + 1) & CANCEL_CHECK_COUNT;
            this.t = t;
        }

        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }
    return StreamSupport.stream(new Taking(stream.spliterator()), stream.isParallel()).onClose(stream::close);
}

其他回答

甚至我也有类似的需求——调用web服务,如果失败,重试3次。如果在多次尝试后仍然失败,请发送电子邮件通知。在谷歌搜索了很多之后,anyMatch()成为了救星。我的示例代码如下。在下面的例子中,如果webServiceCall方法在第一次迭代本身中返回true,则stream不会在我们调用anyMatch()时进一步迭代。我相信,这就是你想要的。

import java.util.stream.IntStream;

import io.netty.util.internal.ThreadLocalRandom;

class TrialStreamMatch {

public static void main(String[] args) {        
    if(!IntStream.range(1,3).anyMatch(integ -> webServiceCall(integ))){
         //Code for sending email notifications
    }
}

public static boolean webServiceCall(int i){
    //For time being, I have written a code for generating boolean randomly
    //This whole piece needs to be replaced by actual web-service client code
    boolean bool = ThreadLocalRandom.current().nextBoolean();
    System.out.println("Iteration index :: "+i+" bool :: "+bool);

    //Return success status -- true or false
    return bool;
}

可能有点离题了,但这是List<T>而不是Stream<T>。

首先你需要有一个take util方法。该方法接受前n个元素:

static <T> List<T> take(List<T> l, int n) {
    if (n <= 0) {
        return newArrayList();
    } else {
        int takeTo = Math.min(Math.max(n, 0), l.size());
        return l.subList(0, takeTo);
    }
}

它就像scala。list。take一样

    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3, 4, 5), 3));
    assertEquals(newArrayList(1, 2, 3), take(newArrayList(1, 2, 3), 5));

    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), -1));
    assertEquals(newArrayList(), take(newArrayList(1, 2, 3), 0));

现在,编写一个基于take的takeWhile方法就相当简单了

static <T> List<T> takeWhile(List<T> l, Predicate<T> p) {
    return l.stream().
            filter(p.negate()).findFirst(). // find first element when p is false
            map(l::indexOf).        // find the index of that element
            map(i -> take(l, i)).   // take up to the index
            orElse(l);  // return full list if p is true for all elements
}

它是这样工作的:

    assertEquals(newArrayList(1, 2, 3), takeWhile(newArrayList(1, 2, 3, 4, 3, 2, 1), i -> i < 4));

这个实现部分迭代列表几次,但它不会增加O(n^2)个操作。希望你能接受。

您可以使用java8 + rxjava。

import java.util.stream.IntStream;
import rx.Observable;


// Example 1)
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n ->
          {
                System.out.println(n);
                return n < 10;
          }
    ).subscribe() ;


// Example 2
IntStream intStream  = IntStream.iterate(1, n -> n + 1);
Observable.from(() -> intStream.iterator())
    .takeWhile(n -> n < 10)
    .forEach( n -> System.out.println(n));

如果你有不同的问题,可能需要不同的解决方案,但对于你当前的问题,我只想说:

IntStream
    .iterate(1, n -> n + 1)
    .limit(10)
    .forEach(System.out::println);

这样的操作在Java 8 Stream中应该是可能的,但它不一定能高效地完成——例如,您不一定能将这样的操作并行化,因为您必须按顺序查看元素。

API并没有提供简单的方法,但最简单的方法可能是获取Stream. Iterator(),将迭代器包装为“take-while”实现,然后返回到Spliterator,然后是Stream。或者——可能——包装Spliterator,尽管在这个实现中它不能再被拆分了。

下面是一个未测试的takeWhile在Spliterator上的实现:

static <T> Spliterator<T> takeWhile(
    Spliterator<T> splitr, Predicate<? super T> predicate) {
  return new Spliterators.AbstractSpliterator<T>(splitr.estimateSize(), 0) {
    boolean stillGoing = true;
    @Override public boolean tryAdvance(Consumer<? super T> consumer) {
      if (stillGoing) {
        boolean hadNext = splitr.tryAdvance(elem -> {
          if (predicate.test(elem)) {
            consumer.accept(elem);
          } else {
            stillGoing = false;
          }
        });
        return hadNext && stillGoing;
      }
      return false;
    }
  };
}

static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<? super T> predicate) {
   return StreamSupport.stream(takeWhile(stream.spliterator(), predicate), false);
}