Stream流

简介

stream是将要处理的元素集合看做一种流,在流的过程中,借助stream API对流中的元素进行操作,比如:筛选,排序,聚合等

stream可以由数组或集合创建,对流的操作分为俩种

  1. 中间操作,每次返回一个新的流
  2. 终端操作,每个流只能进行一次终端操作,终端操作结束后流无法再次使用。终端操作会产生一个新的集合或值。

特性

  1. stream不存储数据,而是按照特定的规则对数据进行计算,一般会输出结果。
  2. stream不会改变数据源,通常情况下会产生一个新的集合或一个值。
  3. stream具有延迟执行的特性,只调用终端操作时,中间操作才会执行。

Stream和parallelStream的简单区分:

stream和parallelStream的简单区分:stream是顺序流,由主线程按顺序对流执行操作,而parallelStream是并行流,内部以多线程并行执行的方式对流进行操作,但前提是流中的数据处理没有顺序要求。

如果流综合你的数据量足够大,并行流可以加快处理速度。

除了直接创建并行流,还可以通过parallel()把顺序流换成并行流;

Optional<Integer> findFirst = list.stream().parallel().filter(x->x>6).findFirst();

stream的使用

在使用stream之前,先理解一个概念Optional

Optional类是一个可以为null的容器对象。如果值存在则isPresent()方法会返回true,调用get()方法会返回该对象。

为什么使用stream

声明式处理数据

第一个原因我觉得是Stream流可以以声明式的方式去处理数据,也就是像它其中就有filter、sort这种以及写好的操作,只需要拿来使用即可,如果我们平时使用for循环,还要在for循环中自己去写怎么过滤的这些操作,最后才得出自己想要的结果,对比这种命令式的操作

可以说让我们代码更加干净、简洁。

处理集合数据

Stream可以说是Java8中对于处理集合的抽象概念,所以我们经常对集合中的数据采用像SQL这种类似方式去处理;所以经常会用Stream进行遍历操作,那相较于我们以前写的嵌套for循环可以说是代码更加的简洁,更直观易读。当然循环只是循环,而Stream是个流的形式去做处理。那如何去做迭代,那就得看看stream的原理了。

惰性计算

惰性计算我们也可以称作惰性求值或者延迟求值,这种方式在函数式编程中极为常见,也就是当计算出结果后不立马去返回值,而是在它要被用到的时候来计算;

在Stream中,我们就可以看作中间操作,比如当要对一个List集合做出Stream操作,比如filter,但是没有最终操作,它返回的还是一个Stream流。

与collection的不同点

  • 不存储数据。流不是一个存储元素的数据结构。它只是传递源(source)数据
  • 功能性的Functional in natrue 在流上操作只是产生一个结果,不会修改源。例如filter只是生成一个筛选后的stream,不会删除源里的元素。
  • 延迟搜索。许多流操作,如filter,map等,都是延迟执行。中间操作总是lazy的
  • stream可能是无界的。而集合操作是有界的(元素数量有限的大小)。短路操作如limit(n),findFirst()可以再有限的时间内完成无界的stream
  • 可消费的(Consumeable)。意思是元素流在生命周期内只能访问一次。再访问只能重新从源头生成一个Stream

stream原理

也许我们会觉得,Stream的实现是每一次去调用函数,它就会进行一次迭代,这肯定是不对的,这样Stream的效率是很低的。

其实事实是我们可以通过源码来发现它是怎样迭代的,其实Stream内部是通过流水线(Pipeline)的方式来实现的,基本思想是在迭代的时候沿着流水线尽可能执行更多的操作,从而避免多次迭代,有几个问题需要解决

  1. 用户操作如何记录
  2. 操作如何叠加
  3. 叠加之后操作如何执行

关键问题解决

以上我们可以知道stream的晚餐操作,是由一个 数据来源,操作,回调函数 组成的三元组

此外我们还需要知道stream的相关类与接口的继承关系

  • 从图中可以看出我们除了基本数据类型以外,引用类型是通过实例化的ReferencePipline来表示
  • 从而与ReferencePipeline并行三个类是为其基本类型定制的。

操作如何记录

首先JDK源码中经常会用stage阶段来标识一次操作

其次,Stream操作通常需要一个回调函数(Lambda表达式)

从一回事那个我们可以看出,当我们调用stream方法时,最终会去创建一个head实例来操作头,也就是第一个stage,当调用filter()方法时则会创建中间操作实例StateLessOP(无状态),接着调用map方法时会创建中间操作实例StateLessOP,最后调用sort()方法时会创建最终操作实例StatefulOp(有状态),同样调用其他操作对应的方法也会生成一个ReferencePipeline实例,通过调用这一系列操作最终形成一个双向链表,即每个Stage都记录了前一个Stage和本次的操作以及回调函数。

  1. 调用stream,创建head实例

  1. 调用filter或map中间操作
  • 这些中间操作最终都在referencePipeline这个类中,它实现其元素类型的中间管道阶段或管道源阶段的抽象基类。

下面代码逻辑就是将回调函数mapper包装到一个sink当中。由于stream.map()是一个无状态的中间操作,所以map()方法返回了一个statelessOp内部对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。这个Sink就是下面提到的操作如何叠加的方式。

 @Override
@SuppressWarnings("unchecked")
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    Objects.requireNonNull(mapper);
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.apply(u));
                }
            };
        }
    };
}

操作如何叠加

从上面我们知道Stream通过stage记录操作,但stage只保存当前操作,它是不知道怎么操作下一个stage,它有需要什么操作。

所以要执行的话还需要各种协议将stage关联起来。

JDK中就是使用Sink(我们可以称为汇聚节点)接口来实现的,Sink接口定义begin(),end(),cancellationRequested(),accept()四个方法

方法名作用
void begin(long size)开始遍历元素之前调用该方法,通知Sink做好准备。
void end()所有元素遍历完成之后调用,通知Sink没有更多的元素了。
boolean cancellationRequested()是否可以结束操作,可以让短路操作尽早结束。
void accept(T t)遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。

有了上面的协议,相邻Stage之间调用就很方便了,吗,每个Stage都会将自己操作封装到一个Sink里,前一个Stage只需要调用后一个Stage的accept方法即可,并不需要知道其内部是如何处理的.

当然对于有状态的操作,Sink的begin和end方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的Sink.bean()方法可能创建一个盛放结果的容器,accpet方法负责将元素添加到容器,最后end负责对容器进行排序。

对于短路操作,Sink.cancellationRequested()也是必须实现的,比如Stream.findFirst()是短路操作,只要找到一个元素,cancellationRequested()就应该返回true,以便调用者尽快结束查找。

Sink的四个接口方法常常相互协作,共同完成计算任务。

实际上Stream API内部实现的的本质,就是如何重写Sink的这四个接口方法。

有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。

一种可能的Sink.accept()方法流程是这样的:

void accept(U u){
    1. 使用当前Sink包装的回调函数处理u
    2. 将处理结果传递给流水线下游的Sink
}

Sink接口的其他几个方法也是按照这种[处理->转发]的模型实现。

下面我们结合具体例子看看Stream的中间操作是如何将自身的操作包装成Sink以及Sink是如何将处理结果转发给下一个Sink的。

先看Stream.map()方法:

// Stream.map(),调用该方法将产生一个新的Stream
public final  Stream map(Functionsuper P_OUT, ? extends R> mapper) {
    ...
    return new StatelessOp(this, StreamShape.REFERENCE,
                                 StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
        @Override /*opWripSink()方法返回由回调函数包装而成Sink*/
        Sink opWrapSink(int flags, Sink downstream) {
            return new Sink.ChainedReference(downstream) {
                @Override
                public void accept(P_OUT u) {
                    R r = mapper.apply(u);// 1. 使用当前Sink包装的回调函数mapper处理u
                    downstream.accept(r);// 2. 将处理结果传递给流水线下游的Sink
                }
            };
        }
    };
}

上述代码看似复杂,其实逻辑很简单,就是将回调函数mapper包装到一个sink中。由于stream.map()是一个无状态的中间操作,所以map()方法反悔了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。

再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:

// Stream.sort()方法用到的Sink实现
class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    private ArrayList list;// 存放用于排序的元素
    RefSortingSink(Sinksuper T> downstream, Comparatorsuper T> comparator) {
        super(downstream, comparator);
    }
    @Override
    public void begin(long size) {
        ...
        // 创建一个存放排序元素的列表
        list = (size >= 0) ? new ArrayList((int) size) : new ArrayList();
    }
    @Override
    public void end() {
        list.sort(comparator);// 只有元素全部接收之后才能开始排序
        downstream.begin(list.size());
        if (!cancellationWasRequested) {// 下游Sink不包含短路操作
            list.forEach(downstream::accept);// 2. 将处理结果传递给流水线下游的Sink
        }
        else {// 下游Sink包含短路操作
            for (T t : list) {// 每次都调用cancellationRequested()询问是否可以结束处理。
                if (downstream.cancellationRequested()) break;
                downstream.accept(t);// 2. 将处理结果传递给流水线下游的Sink
            }
        }
        downstream.end();
        list = null;
    }
    @Override
    public void accept(T t) {
        list.add(t);// 1. 使用当前Sink包装动作处理t,只是简单的将元素添加到中间列表当中
    }
}

上述代码完美展现了Sink的四个接口方法是如何协同工作的

  1. 首先begin()方法告诉Sink参与排序的元素个数,方便确定中间结果容器大小
  2. 之后通过accept()方法将元素添加到中间结果中,最终执行时调用者会不断调用该方法,直到遍历所有元素
  3. 最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游Sink
  4. 如果下游的Sink是短路操作,将结果传递给下游时不断轮训下游cancellationRequested()是否可以结束处理。

Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式来叠加操作。这一连串的齿轮已经咬合,就差最后一步拨动齿轮启动执行。

是什么启动这一连串的操作呢?也许你已经想到了启动的原始动力就是结束操作(Terminal Operation),一旦调用某个结束操作,就会触发整个流水线的执行。

结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。

结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[处理->转发]模型,结束操作的Sink就是调用链的出口。

我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在PipelineHelper中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。

但Stream类库的设计者没有这么做,而是设置了一个Sink AbstractPipeline.opWrapSink(int flags, Sink downstream)方法来得到Sink,该方法的作用是返回一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?

这是因为使用opWrapSink()可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。

试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink()方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:

// AbstractPipeline.wrapSink()
// 从下游向上游不断包装Sink。如果最初传入的sink代表结束操作,
// 函数返回时就可以得到一个代表了流水线上所有操作的Sink。
final  Sink wrapSink(Sink sink) {
    ...
    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink) sink;
}

现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里,执行这个Sink就相当于执行整个流水线,执行Sink的代码如下:

// AbstractPipeline.copyInto(), 对spliterator代表的数据执行wrappedSink代表的操作。
final  void copyInto(Sink wrappedSink, Spliterator spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());// 通知开始遍历
        spliterator.forEachRemaining(wrappedSink);// 迭代
        wrappedSink.end();// 通知遍历结束
    }
    ...
}

上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代,最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。

执行结果

最后一个问题是流水线上所有操作都执行后,用户所需要的结果(如果有)在哪里?首先要说明的是不是所有的Stream结束操作都需要返回结果,有些操作只是为了使用其副作用(Side-effects),比如使用Stream.forEach()方法将结果打印出来就是常见的使用副作用的场景(事实上,除了打印之外其他场景都应避免使用副作用),对于真正需要返回结果的结束操作结果存在哪里呢?

特别说明:副作用不应该被滥用,也许你会觉得在Stream.forEach()里进行元素收集是个不错的选择,就像下面代码中那样,但遗憾的是这样使用的正确性和效率都无法保证,因为Stream可能会并行执行。大多数使用副作用的地方都可以使用归约操作更安全和有效的完成。

// 错误的收集方式
ArrayList results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
      .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
// 正确的收集方式
Listresults =
     stream.filter(s -> pattern.matcher(s).matches())
             .collect(Collectors.toList());  // No side-effects!

回到流水线执行结果的问题上来,需要返回结果的流水线结果存在哪里呢?这要分不同的情况讨论,下表给出了各种有返回结果的Stream结束操作。

返回类型对应的结束操作
booleananyMatch() allMatch() noneMatch()
OptionalfindFirst() findAny()
归约结果reduce() collect()
数组toArray()

对于表中返回boolean或者Optional的操作(Optional是存放 一个 值的容器)的操作,由于值返回一个值,只需要在对应的Sink中记录这个值,等到执行结束时返回就可以了。

对于归约操作,最终结果放在用户调用时指定的容器中(容器类型通过收集器指定)。collect(), reduce(), max(), min()都是归约操作,虽然max()和min()也是返回一个Optional,但事实上底层是通过调用reduce()方法实现的。

对于返回是数组的情况,毫无疑问的结果会放在数组当中。这么说当然是对的,但在最终返回数组之前,结果其实是存储在一种叫做Node的数据结构中的。Node是一种多叉树结构,元素存储在树的叶子当中,并且一个叶子节点可以存放多个元素。这样做是为了并行执行方便。关于Node的具体结构,我们会在下一节探究Stream如何并行执行时给出详细说明。

Last modification:November 20, 2023
如果觉得我的文章对你有用,请随意赞赏