Datastream
Обзор
Datastream - это чрезвычайно быстрая реализация реактивных потоков. Он полезен для внутрисерверной и межсерверной связи и асинхронной обработки данных.
Характеристики
- Современная реализация асинхронных реактивных потоков (в отличие от потоков в Java 8 и традиционных блокирующих потоков на основе тредов)
- Асинхронный с чрезвычайно эффективным управлением back-pressure, чтобы справиться с естественным дисбалансом в скорости источников данных
- Композиционно совместимые потоковые операции (отобразители, редукторы, фильтры, сортировщики, слияния/разделения, сжатие, сериализация)
- Потоковый сетевой и файловый ввод-вывод поверх модуля Eventloop.
- Совместимость с модулем CSP
Сравнение с CSP
У Datastream много общего с модулем CSP. Хотя они оба были разработаны для обработки ввода-вывода, существует несколько важных различий:
Datastream | CSP | |
---|---|---|
Overhead: | Крайне низкая стоимость: поток может быть запущен с 1 виртуального вызова, оценка короткого замыкания оптимизирует производительность | Нет оценки короткого замыкания, overhead выше |
Скорость пропускной способности: | Чрезвычайно быстро (360 880 548 операций в секунду) | Быстро (105 932 203 оп/с), но медленнее, чем Datastream. |
Оптимизировано для: | Небольшие фрагменты данных | Объекты среднего размера, ByteBufs |
Модель программирования: | Более сложная | Простота и удобство |
Для обеспечения максимальной эффективности ActiveJ широко использует комбинации CSP и Datastream. Для этого ChannelSupplier
, ChannelConsumer
, StreamSupplier
и StreamConsumer
имеют методы transformWith()
и специальные интерфейсы Transformer. Используя эти методы и интерфейсы, вы можете легко преобразовывать каналы в другие каналы или потоки данных и наоборот, создавая цепочки таких преобразований.
Бенчмарки
Мы измерили производительность Datastream (StreamSupplier
передает 100M Integer объектов сценарию StreamConsumer
) и получили следующий результат:
Время: 2771 мс; Среднее время: 277,1 мс; Лучшее время: 275 мс; Худшее время: 281 мс; Операции в секунду: 360 880 548
Мы также измерили производительность TCP-сервера, использующего как Datastream, так и CSP, и получили средний результат 47 495 905 запросов в секунду.
Вы можете найти исходные файлы бенчмарков на GitHub
Примеры
- Простой поставщик - показывает, как создать простой
StreamSupplier
и передать некоторые данные наStreamConsumer
- Простой потребитель - показывает, как создать простой
StreamConsumer.
- Пользовательский трансформер - показывает, как создать пользовательский
StreamTransformer
, который принимает строки и преобразует их к своей длине, если она меньше, чемMAX_LENGTH
. - Встроенные узлы потока Пример - демонстрирует некоторые встроенные возможности Datastream, такие как фильтрация, разделение, объединение и сопоставление.
note
Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub
git clone https://github.com/activej/activej
И импортируйте его как проект Maven. Посмотрите тег v5.5. Перед запуском примеров выполните сборку проекта. Эти примеры расположены по адресу activej/examples/core/datastream.
Простой поставщик
Когда вы запускаете SupplierExample
вы увидите следующий результат:
Потребитель получен: [0, 1, 2, 3, 4]
Этот вывод представляет собой данные, которые наш пользователь StreamSupplier
предоставил StreamConsumer
. Давайте посмотрим на реализацию :
``java url=/examples/core/datastream/src/main/java/SupplierExample.java tag=EXAMPLE
**<Githublink url='/examples/core/datastream/src/main/java/SupplierExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**### Простой потребительКогда вы запускаете <Githublink url='/examples/core/datastream/src/main/java/ConsumerExample.java'>`ConsumerExample`</Githublink>вы увидите следующий результат:
```bashполучено: 1получено: 2получено: 3Конец потока получен```
`ConsumerExample` представляет пользовательский потребитель, который расширяет <Githublink url='/core-datastream/src/main/java/io/activej/datastream/AbstractStreamConsumer.java'>`AbstractStreamConsumer`</Githublink> и просто распечатывает полученные данные. Управление процессом потока осуществляется с помощью переопределенных методов `onStarted()`, `onEndOfStream()` и `onError()`:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/ConsumerExample.java tag=EXAMPLE
**<Githublink url='/examples/core/datastream/src/main/java/ConsumerExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**### Индивидуальный трансформатор[<Githublink url='/examples/core/datastream/src/main/java/TransformerExample.java'>`Пример трансформатора`</Githublink>](#) показывает, как создать пользовательский `StreamTransformer` , который принимает строки из входного потока и преобразует их в свою длину, если она меньше заданной `MAX_LENGTH`. Во-первых, мы определяем `AbstractStreamConsumer` и <Githublink url='/core-datastream/src/main/java/io/activej/datastream/AbstractStreamSupplier.java'>`AbstractStreamSupplier`</Githublink>:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/TransformerExample.java tag=REGION_1
Теперь мы определим метод `main` , который создает поставщика тестовых данных, экземпляр `TransformerExample` и <Githublink url='/core-datastream/src/main/java/io/activej/datastream/StreamConsumerToList.java'>`StreamConsumerToList`</Githublink> Далее мы определяем последовательность преобразования и вывода:
````mdx-code-block``java url=/examples/core/datastream/src/main/java/TransformerExample.java tag=REGION_2
Если вы запустите пример, то получите следующий результат:
```java[8, 9]```
**<Githublink url='/examples/core/datastream/src/main/java/TransformerExample.java'>Полный текст примера смотрите на GitHub.</Githublink>**
### Встроенные потоковые узлы
[<Githublink url='/examples/core/datastream/src/main/java/BuiltinNodesExample.java'>`BuiltinStreamNodesExample`</Githublink>](#) демонстрирует несколько простых примеров использования встроенных узлов потока данных.
The first one is <Githublink url='/core-datastream/src/main/java/io/activej/datastream/processor/StreamFilter.java'>`StreamFilter`</Githublink> It allows to apply a function to the input and then stream the function's result to the destination. In this particular example `StreamFilter` filters input numbers and then streams to consumer only odd numbers.
````mdx-code-block```java url=/examples/core/datastream/src/main/java/BuiltinNodesExample.java tag=REGION_1```
The output for this example is [1, 3, 5, 7, 9]
, while the graph of streams is pretty simple and looks as follows:
Another built-in stream node is StreamFilter::mapper
It changes each input item according to the given function, for example:
private static void mapper() { //creating a supplier of 10 numbers StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//creating a mapper for the numbers StreamFilter<Integer, String> simpleMap = StreamFilter.mapper(x -> x + " times ten = " + x * 10);
//creating a consumer which converts received values to list StreamConsumerToList<String> consumer = StreamConsumerToList.create();
//applying the mapper to supplier and streaming the result to consumer supplier.transformWith(simpleMap).streamTo(consumer);
//when consumer completes receiving values, the result is printed out consumer.getResult().whenResult(v -> System.out.println(v));}
The output for this example is
[ 1 times ten = 10, 2 times ten = 20, 3 times ten = 30, 4 times ten = 40, 5 times ten = 50, 6 times ten = 60, 7 times ten = 70, 8 times ten = 80, 9 times ten = 90, 10 times ten = 100]
and the graph of streams looks as follows:
Let's take a look at another built-in node StreamSplitter
It's a stream transformer that distributes input streams according to the provided function. In this example it distributes 10 numbers between 3 consumers.
private static void splitter() { StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
ToIntFunction<Object> hashSharder = item -> (item.hashCode() & Integer.MAX_VALUE) % 3; //creating a sharder of three parts for three consumers StreamSplitter<Integer, Integer> sharder = StreamSplitter.create( (item, acceptors) -> acceptors[hashSharder.applyAsInt(item)].accept(item));
StreamConsumerToList<Integer> first = StreamConsumerToList.create(); StreamConsumerToList<Integer> second = StreamConsumerToList.create(); StreamConsumerToList<Integer> third = StreamConsumerToList.create();
sharder.newOutput().streamTo(first); sharder.newOutput().streamTo(second); sharder.newOutput().streamTo(third);
supplier.streamTo(sharder.getInput());
first.getResult().whenResult(x -> System.out.println("first: " + x)); second.getResult().whenResult(x -> System.out.println("second: " + x)); third.getResult().whenResult(x -> System.out.println("third: " + x));}
note
In this example we can't use a shortcut method transformWith
as it can be used only with the transformers that have exactly one input and one output streams.
The output for this example is
first: [3, 6, 9]second: [1, 4, 7, 10]third: [2, 5, 8]`
And the streams graph looks as follows:
You can provide any function in the lambda when you create StreamSplitter. For example, to create a transformer that will send all input data to all the consumers, simply create the following lambda expression:
(item, acceptors) -> {for (StreamDataAcceptor<Integer> acceptor : acceptors) { acceptor.accept(item);}}
In this case the output will be
first: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]second: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]third: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Finally, let's take a look at the StreamUnion
It works as an opposite to the StreamSplitter
and unifies several input streams into one output:
private static void union() { //creating three suppliers of numbers StreamSupplier<Integer> source0 = StreamSupplier.of(1, 2); StreamSupplier<Integer> source1 = StreamSupplier.of(); StreamSupplier<Integer> source2 = StreamSupplier.of(3, 4, 5);
//creating a unifying transformer StreamUnion<Integer> streamUnion = StreamUnion.create();
//creating a consumer which converts received values to list StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();
//stream the sources into new inputs of the unifier source0.streamTo(streamUnion.newInput()); source1.streamTo(streamUnion.newInput()); source2.streamTo(streamUnion.newInput());
//and stream the output of the unifier into the consumer streamUnion.getOutput().streamTo(consumer);
//when consumer completes receiving values, the result is printed out consumer.getResult().whenResult(v -> System.out.println(v));}
The output for this example is [1, 2, 3, 4, 5]
and the graph of streams looks as follows: