跳到主要内容

数据流

概述

Datastream是一个非常快速的反应式流的实现。 它对服务器内和服务器间的 通信和异步数据处理非常有用。

特点

  • 一个现代的异步反应流的实现(不同于Java 8中的流和传统的基于线程的阻塞流)。
  • 具有极其有效的背压控制的异步性,以处理数据源速度的自然不平衡问题
  • 可组合的流操作(映射器、还原器、过滤器、分类器、合并/拆分器、压缩、序列化)。
  • Eventloop模块之上的基于流的网络和文件I/O
  • CSP模块相兼容

与CSP的比较

Datastream与CSP模块有很多共同之处。 虽然它们都是为I/O处理而设计的,但有几个重要的区别。

数据流CSP
俯视。极低:流可以用1个虚拟调用启动,短路评估优化了性能没有短路评估,开销较大
吞吐速度。速度极快 (360 880 548 ops/sec)快 (105 932 203 ops/sec),但比Datastream慢。
优化的是。小块的数据中等规模的物体, ByteBufs
编程模式。更加复杂简单而方便

为了提供最大的效率,ActiveJ广泛利用了CSP和Datastream的组合。 为了这个目的。 渠道供应商, 渠道消费者"。, 流动供应商"。流水线消费者"(StreamConsumer)。transformWith() 方法和特殊的Transformer接口。 使用这些方法和接口,你可以将通道无缝地转换为其他通道或数据流,反之亦然,创建这种转换的链。

基准

我们已经测量了Datastream的性能(StreamSupplier 将100M的整数对象流向 StreamConsumer 场景) ,并得到了以下结果。

时间:2771ms;平均时间:277.1ms;最佳时间:275ms;最差时间:281ms;每秒操作数:360 880 548

我们还测量了同时使用Datastream和CSP的TCP服务器性能,得到的平均结果是: 47 495 905次/秒

你可以在以下网站找到基准来源 GitHub

实例

note

要运行例子,你需要从GitHub克隆ActiveJ

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查看标签 v5.4.3。 在运行这些例子之前,先建立项目。 这些例子位于 activej/examples/core/datastream。

简单的供应商

当你运行 "SupplierExample"(供应商实例)。,你会看到以下输出。

收到的消费者。[0, 1, 2, 3, 4]

这个输出代表了我们自定义的 StreamSupplier 提供给 StreamConsumer的数据。 让我们来看看 ,看一下执行情况。

public final class SupplierExample {  public static void main(String[] args) {
    //create an eventloop for streams operations    Eventloop eventloop = Eventloop.create().withCurrentThread();    //create a supplier of some numbers    StreamSupplier<Integer> supplier = StreamSupplier.of(0, 1, 2, 3, 4);    //creating a consumer for our supplier    StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
    //streaming supplier's numbers to consumer    supplier.streamTo(consumer);
    //when stream completes, streamed data is printed out    consumer.getResult().whenResult(result -> System.out.println("Consumer received: " + result));
    //start eventloop    eventloop.run();  }}

在GitHub上看到完整的例子

简单的消费者

当你运行 消费者实例,你会看到以下输出。

收到。1收到。2收到。3收到的流结束

ConsumerExample 表示一个自定义的消费者,它扩展了 AbstractStreamConsumer 而只是打印出收到的数据。 流媒体过程是通过重载方法 onStarted(), onEndOfStream()onError()来管理。

public final class ConsumerExample<T> extends AbstractStreamConsumer<T> {  @Override  protected void onStarted() {    resume(x -> System.out.println("received: " + x));  }
  @Override  protected void onEndOfStream() {    System.out.println("End of stream received");    acknowledge();  }
  @Override  protected void onError(Exception t) {    System.out.println("Error handling logic must be here. No confirmation to upstream is needed");  }

在GitHub上看到完整的例子

定制变压器

变换器实例 显示了如何创建一个自定义的 StreamTransformer ,它从输入流中获取字符串,如果其长度小于定义的 MAX_LENGTH,则对其进行变换。 首先,我们定义 AbstractStreamConsumer抽象流供应商:

private final AbstractStreamConsumer<String> input = new AbstractStreamConsumer<String>() {  @Override  protected void onEndOfStream() {    output.sendEndOfStream();  }};
private final AbstractStreamSupplier<Integer> output = new AbstractStreamSupplier<Integer>() {  @Override  protected void onResumed() {    input.resume(item -> {      int len = item.length();      if (len < MAX_LENGTH) {        output.send(len);      }    });  }
  @Override  protected void onSuspended() {    input.suspend();  }};

现在我们定义 main 方法,它创建一个测试数据的供应者,一个 TransformerExample 的实例和 StreamConsumerToList 接下来,我们定义转换和输出的顺序。

public static void main(String[] args) {  Eventloop eventloop = Eventloop.create().withCurrentThread().withEventloopFatalErrorHandler(rethrow());
  StreamSupplier<String> source = StreamSupplier.of("testdata", "testdata1", "testdata1000");  TransformerExample transformer = new TransformerExample();  StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
  source.transformWith(transformer).streamTo(consumer);  consumer.getResult().whenResult(v -> System.out.println(v));
  eventloop.run();}

如果你运行这个例子,你会收到以下输出。

[8, 9]

在GitHub上看到完整的例子

内置流节点

BuiltinStreamNodesExample 展示了一些利用内置数据流节点的简单例子。

第一个是 流媒体过滤器"。 它允许将一个函数应用于输入,然后将该函数的结果流向目的地。 在这个 特定的例子中 StreamFilter 过滤输入的数字,然后流向消费者只有奇数。

private static void filter() {  StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  StreamFilter<Integer, Integer> filter = StreamFilter.create(input -> input % 2 == 1);
  StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
  supplier.transformWith(filter).streamTo(consumer);
  consumer.getResult().whenResult(v -> System.out.println(v));}

这个例子的输出是 [1, 3, 5, 7, 9],而流的图形相当简单,看起来如下。

graph TD id1(StreamSuppliers$OfIterator)-->id2(StreamFilter$Input)-->id3([StreamFilter])-->id4(StreamFilter$Output)-->id5(StreamConsumerToList) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep, stroke-width:2px style id5 stroke:#fa383e, stroke-width:2px

另一个内置的流节点是 "StreamFilter::mapper"。

它根据给定的函数改变每个输入项,例如。

private static void mapper() {  //creating a supplier of 10 numbers  StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  //creating a mapper for the numbers  StreamFilter<Integer, String> simpleMap = StreamFilter.mapper(x -> x + " times ten = " + x * 10);
  //creating a consumer which converts received values to list  StreamConsumerToList<String> consumer = StreamConsumerToList.create();
  //applying the mapper to supplier and streaming the result to consumer  supplier.transformWith(simpleMap).streamTo(consumer);
  //when consumer completes receiving values, the result is printed out  consumer.getResult().whenResult(v -> System.out.println(v));}

这个例子的输出是

[    1次十=10,    2次十=20,    3次十=30,    4次十=40,    5次十=50,    6次十=60,    7次十=70,    8次十=80,    9次十=90,    10次十=100]

和流的图形看起来如下。

graph TD id1(StreamSuppliers$OfIterator)-->id2(StreamMapper$Input)-->id3([StreamMapper])-->id4(StreamMapper$Output)-->id5(StreamConsumerToList) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep, stroke-width:2px style id5 stroke:#fa383e, stroke-width:2px

让我们看一下另一个内置节点 "StreamSplitter"(分流器)

它是一个流转化器,根据提供的函数分配输入流。 在这个例子中,它 ,在3个消费者之间分配10个数字。

private static void splitter() {  StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  ToIntFunction<Object> hashSharder = item -> (item.hashCode() & Integer.MAX_VALUE) % 3;  //creating a sharder of three parts for three consumers  StreamSplitter<Integer, Integer> sharder = StreamSplitter.create(      (item, acceptors) -> acceptors[hashSharder.applyAsInt(item)].accept(item));
  StreamConsumerToList<Integer> first = StreamConsumerToList.create();  StreamConsumerToList<Integer> second = StreamConsumerToList.create();  StreamConsumerToList<Integer> third = StreamConsumerToList.create();
  sharder.newOutput().streamTo(first);  sharder.newOutput().streamTo(second);  sharder.newOutput().streamTo(third);
  supplier.streamTo(sharder.getInput());
  first.getResult().whenResult(x -> System.out.println("first: " + x));  second.getResult().whenResult(x -> System.out.println("second: " + x));  third.getResult().whenResult(x -> System.out.println("third: " + x));}
note

在这个例子中,我们不能使用快捷方法 transformWith ,因为它只能 ,用于正好有一个输入和一个输出流的变换器。

这个例子的输出是

第一。[3,6,9]第二:[1,4,7,10]第三:[2,5,8]`。

而流向图看起来如下。

graph TD; id7(StreamSuppliers$OfIterator):::blue-->id8(StreamSplitter$Input); id8-->StreamSplitter:::grey; StreamSplitter-->id1(StreamSplitter$Output); StreamSplitter-->id2(StreamSplitter$Output); StreamSplitter-->id3(StreamSplitter$Output); id1-->id4(StreamConsumerToList); id2-->id5(StreamConsumerToList); id3-->id6(StreamConsumerToList);

当你创建 StreamSplitter,你可以在lambda中提供任何函数。 对于 ,要创建一个将所有输入数据发送至所有消费者的转化器,只需创建以下lambda表达式。

(item, acceptors) -> {for (StreamDataAcceptor<Integer> acceptor : acceptors) { acceptor.accept(item);}}

在这种情况下,输出将是

第一篇[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]第二次:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]第三次:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

最后,让我们看一下 "StreamUnion 它的作用与 StreamSplitter 相反,并将几个输入流统一为一个输出。

private static void union() {  //creating three suppliers of numbers  StreamSupplier<Integer> source0 = StreamSupplier.of(1, 2);  StreamSupplier<Integer> source1 = StreamSupplier.of();  StreamSupplier<Integer> source2 = StreamSupplier.of(3, 4, 5);
  //creating a unifying transformer  StreamUnion<Integer> streamUnion = StreamUnion.create();
  //creating a consumer which converts received values to list  StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
  //stream the sources into new inputs of the unifier  source0.streamTo(streamUnion.newInput());  source1.streamTo(streamUnion.newInput());  source2.streamTo(streamUnion.newInput());
  //and stream the output of the unifier into the consumer  streamUnion.getOutput().streamTo(consumer);
  //when consumer completes receiving values, the result is printed out  consumer.getResult().whenResult(v -> System.out.println(v));}

这个例子的输出是 [1, 2, 3, 4, 5] ,流的图形看起来如下。

graph TD; id1(StreamSuppliers$OfIterator)-->id2(StreamUnion$Input); id3(StreamSuppliers$OfIterator)-->id4(StreamUnion$Input); id5(StreamSuppliers$OfIterator)-->id6(StreamUnion$Input); id2-->StreamUnion; id4-->StreamUnion; id6-->StreamUnion; StreamUnion-->StreamUnion$Output; StreamUnion$Output-->StreamConsumerToList;

在GitHub上看到完整的例子