Перейти к основному содержанию

Datastream

Обзор

Datastream - это чрезвычайно быстрая реализация реактивных потоков. Он полезен для внутрисерверной и межсерверной связи и асинхронной обработки данных.

Характеристики

  • Современная реализация асинхронных реактивных потоков (в отличие от потоков в Java 8 и традиционных блокирующих потоков на основе тредов)
  • Асинхронный с чрезвычайно эффективным управлением back-pressure, чтобы справиться с естественным дисбалансом в скорости источников данных
  • Композиционно совместимые потоковые операции (отобразители, редукторы, фильтры, сортировщики, слияния/разделения, сжатие, сериализация)
  • Потоковый сетевой и файловый ввод-вывод поверх модуля Eventloop.
  • Совместимость с модулем CSP

Сравнение с CSP

У Datastream много общего с модулем CSP. Хотя они оба были разработаны для обработки ввода-вывода, существует несколько важных различий:

DatastreamCSP
Overhead:Крайне низкая стоимость: поток может быть запущен с 1 виртуального вызова, оценка короткого замыкания оптимизирует производительностьНет оценки короткого замыкания, overhead выше
Скорость пропускной способности:Чрезвычайно быстро (360 880 548 операций в секунду)Быстро (105 932 203 оп/с), но медленнее, чем Datastream.
Оптимизировано для:Небольшие фрагменты данныхОбъекты среднего размера, ByteBufs
Модель программирования:Более сложнаяПростота и удобство

Для обеспечения максимальной эффективности ActiveJ широко использует комбинации CSP и Datastream. Для этого ChannelSupplier, ChannelConsumer, StreamSupplier и StreamConsumer имеют методы transformWith() и специальные интерфейсы Transformer. Используя эти методы и интерфейсы, вы можете легко преобразовывать каналы в другие каналы или потоки данных и наоборот, создавая цепочки таких преобразований.

Бенчмарки

Мы измерили производительность Datastream (StreamSupplier передает 100M Integer объектов сценарию StreamConsumer ) и получили следующий результат:

Время: 2771 мс; Среднее время: 277,1 мс; Лучшее время: 275 мс; Худшее время: 281 мс; Операции в секунду: 360 880 548

Мы также измерили производительность TCP-сервера, использующего как Datastream, так и CSP, и получили средний результат 47 495 905 запросов в секунду.

Вы можете найти исходные файлы бенчмарков на GitHub

Примеры

note

Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите тег v5.5. Перед запуском примеров выполните сборку проекта. Эти примеры расположены по адресу activej/examples/core/datastream.

Простой поставщик

Когда вы запускаете SupplierExampleвы увидите следующий результат:

Потребитель получен: [0, 1, 2, 3, 4]

Этот вывод представляет собой данные, которые наш пользователь StreamSupplier предоставил StreamConsumer. Давайте посмотрим на реализацию :

``java url=/examples/core/datastream/src/main/java/SupplierExample.java tag=EXAMPLE

**<Githublink url='/examples/core/datastream/src/main/java/SupplierExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**### Простой потребительКогда вы запускаете <Githublink url='/examples/core/datastream/src/main/java/ConsumerExample.java'>`ConsumerExample`</Githublink>вы увидите следующий результат:
```bashполучено: 1получено: 2получено: 3Конец потока получен```
`ConsumerExample` представляет пользовательский потребитель, который расширяет <Githublink url='/core-datastream/src/main/java/io/activej/datastream/AbstractStreamConsumer.java'>`AbstractStreamConsumer`</Githublink> и просто распечатывает полученные данные. Управление процессом потока осуществляется с помощью переопределенных методов `onStarted()`, `onEndOfStream()` и `onError()`:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/ConsumerExample.java tag=EXAMPLE
**<Githublink url='/examples/core/datastream/src/main/java/ConsumerExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**### Индивидуальный трансформатор[<Githublink url='/examples/core/datastream/src/main/java/TransformerExample.java'>`Пример трансформатора`</Githublink>](#) показывает, как создать пользовательский `StreamTransformer` , который принимает строки из входного потока и преобразует их в свою длину, если она меньше заданной `MAX_LENGTH`. Во-первых, мы определяем `AbstractStreamConsumer` и <Githublink url='/core-datastream/src/main/java/io/activej/datastream/AbstractStreamSupplier.java'>`AbstractStreamSupplier`</Githublink>:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/TransformerExample.java tag=REGION_1
Теперь мы определим метод `main` , который создает поставщика тестовых данных, экземпляр `TransformerExample` и <Githublink url='/core-datastream/src/main/java/io/activej/datastream/StreamConsumerToList.java'>`StreamConsumerToList`</Githublink> Далее мы определяем последовательность преобразования и вывода:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/TransformerExample.java tag=REGION_2
Если вы запустите пример, то получите следующий результат:
```java[8, 9]```
**<Githublink url='/examples/core/datastream/src/main/java/TransformerExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**
### Встроенные потоковые узлы
[<Githublink url='/examples/core/datastream/src/main/java/BuiltinNodesExample.java'>`BuiltinStreamNodesExample`</Githublink>](#) демонстрирует несколько простых примеров использования встроенных узлов потока данных.
The first one is <Githublink url='/core-datastream/src/main/java/io/activej/datastream/processor/StreamFilter.java'>`StreamFilter`</Githublink> It allows to apply a function to the input and then stream the function's result to the destination. In this particular example `StreamFilter` filters input numbers and then streams to consumer only odd numbers.
````mdx-code-block```java url=/examples/core/datastream/src/main/java/BuiltinNodesExample.java tag=REGION_1```

The output for this example is [1, 3, 5, 7, 9], while the graph of streams is pretty simple and looks as follows:

graph TD id1(StreamSuppliers$OfIterator)-->id2(StreamFilter$Input)-->id3([StreamFilter])-->id4(StreamFilter$Output)-->id5(StreamConsumerToList) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep,stroke-width:2px style id5 stroke:#fa383e,stroke-width:2px

Another built-in stream node is StreamFilter::mapper

It changes each input item according to the given function, for example:

private static void mapper() {  //creating a supplier of 10 numbers  StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  //creating a mapper for the numbers  StreamFilter<Integer, String> simpleMap = StreamFilter.mapper(x -> x + " times ten = " + x * 10);
  //creating a consumer which converts received values to list  StreamConsumerToList<String> consumer = StreamConsumerToList.create();
  //applying the mapper to supplier and streaming the result to consumer  supplier.transformWith(simpleMap).streamTo(consumer);
  //when consumer completes receiving values, the result is printed out  consumer.getResult().whenResult(v -> System.out.println(v));}

The output for this example is

[    1 times ten = 10,    2 times ten = 20,    3 times ten = 30,    4 times ten = 40,    5 times ten = 50,    6 times ten = 60,    7 times ten = 70,    8 times ten = 80,    9 times ten = 90,    10 times ten = 100]

and the graph of streams looks as follows:

graph TD id1(StreamSuppliers$OfIterator)-->id2(StreamMapper$Input)-->id3([StreamMapper])-->id4(StreamMapper$Output)-->id5(StreamConsumerToList) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep,stroke-width:2px style id5 stroke:#fa383e,stroke-width:2px

Let's take a look at another built-in node StreamSplitter

It's a stream transformer that distributes input streams according to the provided function. In this example it distributes 10 numbers between 3 consumers.

private static void splitter() {  StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
  ToIntFunction<Object> hashSharder = item -> (item.hashCode() & Integer.MAX_VALUE) % 3;  //creating a sharder of three parts for three consumers  StreamSplitter<Integer, Integer> sharder = StreamSplitter.create(      (item, acceptors) -> acceptors[hashSharder.applyAsInt(item)].accept(item));
  StreamConsumerToList<Integer> first = StreamConsumerToList.create();  StreamConsumerToList<Integer> second = StreamConsumerToList.create();  StreamConsumerToList<Integer> third = StreamConsumerToList.create();
  sharder.newOutput().streamTo(first);  sharder.newOutput().streamTo(second);  sharder.newOutput().streamTo(third);
  supplier.streamTo(sharder.getInput());
  first.getResult().whenResult(x -> System.out.println("first: " + x));  second.getResult().whenResult(x -> System.out.println("second: " + x));  third.getResult().whenResult(x -> System.out.println("third: " + x));}
note

In this example we can't use a shortcut method transformWith as it can be used only with the transformers that have exactly one input and one output streams.

The output for this example is

first: [3, 6, 9]second: [1, 4, 7, 10]third: [2, 5, 8]`

And the streams graph looks as follows:

graph TD; id7(StreamSuppliers$OfIterator):::blue-->id8(StreamSplitter$Input); id8-->StreamSplitter:::grey; StreamSplitter-->id1(StreamSplitter$Output); StreamSplitter-->id2(StreamSplitter$Output); StreamSplitter-->id3(StreamSplitter$Output); id1-->id4(StreamConsumerToList); id2-->id5(StreamConsumerToList); id3-->id6(StreamConsumerToList);

You can provide any function in the lambda when you create StreamSplitter. For example, to create a transformer that will send all input data to all the consumers, simply create the following lambda expression:

(item, acceptors) -> {for (StreamDataAcceptor<Integer> acceptor : acceptors) { acceptor.accept(item);}}

In this case the output will be

first: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]second: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]third: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Finally, let's take a look at the StreamUnion It works as an opposite to the StreamSplitter and unifies several input streams into one output:

private static void union() {  //creating three suppliers of numbers  StreamSupplier<Integer> source0 = StreamSupplier.of(1, 2);  StreamSupplier<Integer> source1 = StreamSupplier.of();  StreamSupplier<Integer> source2 = StreamSupplier.of(3, 4, 5);
  //creating a unifying transformer  StreamUnion<Integer> streamUnion = StreamUnion.create();
  //creating a consumer which converts received values to list  StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
  //stream the sources into new inputs of the unifier  source0.streamTo(streamUnion.newInput());  source1.streamTo(streamUnion.newInput());  source2.streamTo(streamUnion.newInput());
  //and stream the output of the unifier into the consumer  streamUnion.getOutput().streamTo(consumer);
  //when consumer completes receiving values, the result is printed out  consumer.getResult().whenResult(v -> System.out.println(v));}

The output for this example is [1, 2, 3, 4, 5] and the graph of streams looks as follows:

graph TD; id1(StreamSuppliers$OfIterator)-->id2(StreamUnion$Input); id3(StreamSuppliers$OfIterator)-->id4(StreamUnion$Input); id5(StreamSuppliers$OfIterator)-->id6(StreamUnion$Input); id2-->StreamUnion; id4-->StreamUnion; id6-->StreamUnion; StreamUnion-->StreamUnion$Output; StreamUnion$Output-->StreamConsumerToList;

See full example on GitHub