首页>>后端>>java->Java8 Stream源码精讲(四):一文说透四种终止操作

Java8 Stream源码精讲(四):一文说透四种终止操作

时间:2023-12-07 本站 点击:0

简介

Java8 Stream源码精讲(一):从一个简单的例子入手Java8 Stream源码精讲(二):Stream创建原理深度解析Java8 Stream源码精讲(三):中间操作原理详解

上一篇文章中,我们讲了Stream中间操作,通过分析源码的方式了解了无状态操作和有状态操作的区别,每一个中间操作方法是如何实现的,Stream是惰性流,调用中间操作比如filter()、map()等方法不会立即执行声明的lambda表达式,只有通过调用终止操作才会处理Stream中的元素。本章我们将分析终止操作相关源码,深入了解内部原理。

终止操作

在Stream API中有一个接口TerminalOp代表终止操作。

interface TerminalOp<E_IN, R> {    default StreamShape inputShape() { return StreamShape.REFERENCE; }    default int getOpFlags() { return 0; }    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,                                      Spliterator<P_IN> spliterator) {        if (Tripwire.ENABLED)            Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");        return evaluateSequential(helper, spliterator);    }    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,                                Spliterator<P_IN> spliterator);}

接口上定义了两个泛型类型:

E_IN:输入的元素类型

R:结果类型

来看下方法定义:

inputShape():获取操作的输入元素类型,返回枚举StreamShape,译为Stream的形状。可以看到定义了四种枚举,正好对应Stream、IntStream、LongStream、DoubleStream。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}

evaluateParallel():Stream是并行时,终止操作最终调用这个方法处理数据,我们不关心。

evaluateSequential():串行流时最终调用这个方法处理数据。

通过继承关系可以看到,只有四个类直接实现TerminalOp接口,Stream有这么多终止方法,而实际上可以归类为4个,单独说明一下:toArray()方法也是终止操作,但是与其它不同,没有使用TerminalOp。

现在来看下终止操作划分:

操作类型 方法 非短路操作 forEach() forEachOrdered() toArray() reduce() collect() min() max() count() 短路操作 anyMatch() allMatch() noneMatch() findFirst() findAny()

非短路操作

非短路操作主要有ForEachOp和ReduceOp两个TerminalOp实现,其实toArray()也属于非短路操作,但是它没有依靠实现TerminalOp来完成相应的功能,所以不作讲解,感兴趣的小伙伴可以自己看一下相关源码。

ForEachOp

用于遍历Stream元素的forEach()和forEachOrdered()方法都是通过ForEachOp的子类完成相应工作的。

ForEachOp的继承结构比较复杂,除了实现上面的TerminalOp外,还实现了TerminalSink。TerminalSink聚合了Sink和Supplier接口,这两个大家应该都不陌生,Sink在前面的文章有详细讲解,Supplier是一个函数式接口,用于提供一个结果给调用者。

通过前面的文章和继承结构,我们可以大胆猜测:ForEachOp除了具备终止操作的能力,在数据处理之前,自己还会作为一个sink,与中间操作中的sink实例组成sink链表,通过责任链模式依次处理Stream中的元素。到底是不是这样呢?我们进入源码求证。

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}

重点关注evaluateSequential(),内部会调用PipelineHelper#wrapAndCopyInto()方法,看到这个方法是不是很熟悉,没错就是前面文章中多次提到的处理数据的方法,它的职责是将传入sink与中间操作产生的sink组合成链表,然后调用源Spliterator的方法,发送Stream元素给sink链处理。

我们进入这个方法看一下,它是在AbstractPipeline中实现的:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}

可以看到首先调用wrapSink()方法,将封装有终止操作逻辑的sink再次包装:

final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {    Objects.requireNonNull(sink);    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);    }    return (Sink<P_IN>) sink;}

wrapSink()方法前面也有多次讲到,Stream经过一系列中间操作调用,返回的实际上是一个Stream链表,结构如下:

拿到sink链表之后,再来看看copyInto()方法是如何执行数据处理逻辑的:

final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {    Objects.requireNonNull(wrappedSink);    //非短路操作    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {        //1.begin()方法调用        wrappedSink.begin(spliterator.getExactSizeIfKnown());        //2.forEachRemaining()方法调用,发送元素        spliterator.forEachRemaining(wrappedSink);        //3.end()方法调用,通知结束        wrappedSink.end();    }    //短路操作    else {        copyIntoWithCancel(wrappedSink, spliterator);    }}

因为ForEachOp是非短路操作,所以必定走第一个分支,现在只分析这个逻辑:

在发送元素之前,调用begin()方法,将元素大小传入sink链表,经过有状态中间操作时会初始化相应的变量,这在前一章有详细分析,最终调用终止操作sink。

调用Spliterator#forEachRemaining()方法遍历元素,内部会调用Sink#accept()方法传入每一个元素,同样元素会在sink链表上经过中间操作处理,最后到达终止操作。

end()方法,同理通知结束,做清理工作。

forEach()方法

forEach()方法,调用action函数处理Stream上传递过来的每一个元素,注意在并行流中,元素实际被处理的顺序可能不是Stream的元素顺序。ReferencePipelinet#forEach():

public void forEach(Consumer<? super P_OUT> action) {    evaluate(ForEachOps.makeRef(action, false));}

ForEachOps#makeRef()方法:

public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,                                              boolean ordered) {    Objects.requireNonNull(action);    return new ForEachOp.OfRef<>(action, ordered);}

创建并返回OfRef对象,可以看到它继承自ForEachOp,实现了accept()方法,也就是元素到达终止操作sink时,会调用lambda表达式。

static final class OfRef<T> extends ForEachOp<T> {    final Consumer<? super T> consumer;    OfRef(Consumer<? super T> consumer, boolean ordered) {        super(ordered);        this.consumer = consumer;    }    @Override    public void accept(T t) {        consumer.accept(t);    }}

最后再来看一下AbstractPipeline#evaluate(),每一个终止操作方法内部都会调用evaluate()方法触发Stream上的逻辑执行,在它的内部,又会调用TerminalOp#evaluateSequential()方法:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {    assert getOutputShape() == terminalOp.inputShape();    if (linkedOrConsumed)        throw new IllegalStateException(MSG_STREAM_LINKED);    linkedOrConsumed = true;    return isParallel()           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))           //触发真正的逻辑           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));}

所以通过前面的分析,forEach()方法的流程应该很清晰了:先创建继承自ForEachOp的OfRef对象,然后通过调用AbstractPipeline#evaluate()方法,间接调用ForEachOp#evaluateSequential()方法,将OfRef作为终止操作的sink与中间操作的sink构建成链表,最后通过调用Spliterator#forEachRemaining()方法遍历元素将元素传递给sink链, 而代表forEach()终止操作sink的OfRef对象每接收到上一个sink传递过来的元素都会调用声明的lambda表达式进行处理。

forEachOrdered()方法

forEachOrdered()方法,调用action函数处理Stream上传递过来的每一个元素,在并行流中也能保证元素处理顺序和Stream一致。在串行流中行为和forEach()表现一致。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}0

ReduceOp

ReduceOp是用于聚合Stream中元素的终止操作,ReduceOp应该是工作中被开发者使用最多的终止操作,比如:reduce()、collect()、max()等等。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}1

可以看到ReduceOp继承关系比ForEachOp更简单,ReduceOp没有实现TerminalSink接口,所以它自身不包含终止操作的逻辑,是通过调用重写的makeSink()方法拿到TerminalSink,再传入AbstractPipeline#wrapAndCopyInto()进行处理的,具体的逻辑还得在各个终止操作方法中查看。

reduce()方法

reduce()有三个重载方法,我们选择两个重点分析:

reduce(final P_OUT identity, final BinaryOperator accumulator):

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}2

ReduceOps#makeRef():

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}3

reduce()方法确实是创建了一个 ReduceOp实例,它利用ReducingSink来保存初始结果,并且与每一个元素计算之后覆盖之前的结果。最后通过ReducingSink#get()方法作为Stream终止操作的结果返回。

reduce(BinaryOperator accumulator):

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}4

ReduceOps#makeRef():

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}5

可以看到,与上面的reduce()方法不同的是,它没有一个初始结果,所以在ReducingSink中,使用了两个变量来保存状态,第一个元素作为初始结果与后面的元素计算,并且方法声明的返回值是Optional类型,代表这个操作可能是没有结果的。

max()方法

max()方法,使用比较器比较元素中最大的值并返回。max()和min()方法都是对reduce()方法的特殊应用。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}6

max()方法调用了reduce()方法,我们看下BinaryOperator#maxBy()返回什么:

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}7

逻辑很简单,就是利用比较器比较当前元素和前面所有元素的较大值,并作为最终结果返回。

min()方法

min()方法,使用比较器比较元素中最小的值并返回。逻辑跟max()方法基本完全一样。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}8

collect()方法

上面说ReduceOp是工作中用到的最多的终止操作,那collect()方法应该就是ReduceOp操作中使用最频繁的终止操作方法了。collect()也有两个重载方法,我们重点看带Collector参数的方法。

Collector中文意思是收集器,它提供了获取Supplier、BiConsumer和BinaryOperator等函数的方法,Collector常用在分组、规约、聚合等操作当中。关于Collector这个强大的收集器,我打算放到下一章详细讲解,本章我们先看下collect()是如何使用ReduceOp的。

enum StreamShape {  REFERENCE,  INT_VALUE,  LONG_VALUE,  DOUBLE_VALUE}9

进入ReduceOps#makeRef()方法:

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}0

可以看到跟reduce()方法一样,collect()也是利用ReduceOp来实现的,不过它返回的ReducingSink有些细微的差别,collect()中保持结果的state是由Collector中的supplier提供的,并且处理Stream元素时不会覆盖之前的结果,而是使用Collector#accumulator()方法提供的BiConsumer函数修改state的内部数据。

Stream API通过提供Collector接口,使开发者可以更灵活的处理数据,甚至可以自定义Collector实现类以满足功能。

短路操作

短路操作跟非短路操作的主要区别在于:如果是短路操作,那么不一定需要完整的遍历整个Stream的元素,在某些条件下,可以提前得到结果,提前结束遍历过程。短路操作有MatchOp和FindOp两个TerminalOp实现,分别表示匹配和查找。

MatchOp

MatchOp是一种通过Predicate函数与元素匹配的终止操作,匹配类型包含:ALL、ANY和NONE。

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}1

可以看到实现基本上与其它的TerminalOp相同,只是这里要注意两点:

它通过sinkSupplier工厂方法函数提供的封装终止操作逻辑的sink,讲方法时会详解。

getOpFlags()方法返回StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED,代表短路操作。在触发数据处理逻辑时,通过这个标志走短路的遍历逻辑。

AbstractPipeline#copyInto():

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}2

AbstractPipeline#copyIntoWithCancel():

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}3

短路操作通过遍历pipeline链表,拿到Head节点对象,然后调用它的forEachWithCancel()方法,当然也会在此前后调用begin()和end()方法,就不再展开了。

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}4

其实forEachWithCancel()就是在do-while循环中调用spliterator#tryAdvance()方法遍历元素,不过每遍历一个元素之前都要满足一个条件:!sink.cancellationRequested(),表示请求没有被取消。下面讲解方法时会细说。

anyMatch()方法

anyMatch()方法,Stream中的元素与Predicate函数匹配,如果有任何一个元素匹配则返回true,否则返回false。

下面进入源码,分析这个方法具体如何实现的,ReferencePipeline#anyMatch():

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}5

同样的套路,通过工厂方法MatchOps#makeRef()创建MatchOp对象:

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}6

MatchSink的父类BooleanTerminalSink:

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}7

可以看到返回的确实是MatchOp实例,MatchSink是封装终止操作逻辑的sink。先看一下MatchKind枚举类:

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}8

MatchKind定义了三种枚举类型ANY、ALL、NONE,这里着重关注ANY:字段stopOnPredicateMatches的值为true,如果Predicate函数与元素匹配返回true,则流程终止;字段shortCircuitResult为true,表示符合匹配规则时,返回给调用方的结果为true,否则返回false。

MatchSink通过stop和value变量,以及MatchKind枚举来控制短路和保存匹配结果。

allMatch()方法

allMatch()方法,Stream中的元素与Predicate函数匹配,如果有任何一个元素不匹配,则返回false,否则就表示所有元素都匹配,返回true。注意这里说的比较拗口,主要是为了体现这个方法也是一个短路操作。

ReferencePipeline#allMatch()方法跟anyMatch()差不多,只是传入的MatchKind是ALL类型,就不再展示说明了。

//省略了并行处理相关的代码//泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值static abstract class ForEachOp<T>        implements TerminalOp<T, Void>, TerminalSink<T, Void> {    //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注    private final boolean ordered;    protected ForEachOp(boolean ordered) {        this.ordered = ordered;    }    // TerminalOp    @Override    public int getOpFlags() {        return ordered ? 0 : StreamOpFlag.NOT_ORDERED;    }    @Override    public <S> Void evaluateSequential(PipelineHelper<T> helper,                                       Spliterator<S> spliterator) {        return helper.wrapAndCopyInto(this, spliterator).get();    }    // TerminalSink    @Override    public Void get() {        return null;    }}9

noneMatch()方法

noneMatch()方法,其实就是allMatch()方法的否定,Stream中的元素与Predicate函数匹配,如果有任何一个元素匹配,则返回false,否则就表示所有元素都不匹配,返回true。

ReferencePipeline#noneMatch()方法,创建MatchOp实例时传入的是NONE类型。

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}0

FindOp

FindOp是一种查找Stream中元素的短路操作,当找到一个结果时,终止元素遍历,直接返回结果。

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}1

咋一看,这个类的成员变量非常多,控制逻辑应该很复杂,其实mustFindFirst和presentPredicate都是用于并行流处理过程的。剩下的跟MatchOp非常相似,下面分析具体方法的时候再详解。

findFirst()方法

findFirst()方法,返回一个Optional,这个Optional中的值是Stream的第一个元素,如果Stream中没有元素,则返回Optional#empty()。

在ReferencePipeline#findFirst()方法中,通过FindOps#makeRef()创建FindOp对象:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}2

FindOps#makeRef()方法:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}3

这里的sinkSupplier创建的是FindSink.OfRef实例:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}4

FindSink.OfRef只是实现了get()方法,用于获取查找结果,但是它继承了FindSink:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}5

FindSink通过变量hasValue控制短路流程,当接收到一个元素时,使用value保存结果,并且终止遍历。

findAny()方法

findAny()方法,返回一个Optional,这个Optional中的值是Stream中的一个元素,如果Stream中没有元素,则返回Optional#empty()。注意在并行流调用中,返回的不一定是Stream的第一个元素,但如果是串行流,则行为跟findFirst()一样,Optional中的值就是Stream第一个元素。

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);    return sink;}2

总结

本文先解释了Stream调用过程中,终止操作是所声明的lambda表达式的触发时机,讲解了中间操作调用时的链表封装和终止操作的调用流程。依据是否完整遍历Stream元素,将终止操作划分为非短路操作和短路操作,通过带领大家阅读源码,深入分析了ForEachOp、ReduceOp、MatchOp和FindOp四种终止操作的实现原理,以及每一个终止操作方法是如何实现的。

写在最后

本篇文章是本系列的第四章,相信大家完整阅读之后,对Stream的整体工作流程和原理有了更深入的理解。

在讲解ReduceOp时说到collect()方法拥有分组规约的能力,能够帮助开发者简化很多流程代码、提升开发效率,下一章我们讲深入其中,看看它为何具有如此强大的功能,以及当Collectors中提供的功能不能满足我们的需要时,如何扩展相应的能力。

最后,原创不易,如果觉得本系列文章对您有帮助,能够加深您对Stream原理和源码的理解的话,请不要吝啬您手中的赞(✪ω✪)!

原文:https://juejin.cn/post/7103718952626290695


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/18678.html