Skip to main content

Datastream

Overview

Datastream is a high-performance implementation of reactive streams. It is useful for intra and inter-server communication and asynchronous data processing.

Features

  • Modern implementation of asynchronous reactive streams (as opposed to streams in Java 8 and traditional thread-based blocking streams)
  • Asynchronous with effective backpressure control to cope with the natural imbalance in the speed of data sources
  • Composable stream operations (mappers, reducers, filters, sorters, mergers/splitters, compression, serialization)
  • Stream-based network and file I/O on top of reactors (Eventloop module)
  • Compatibility with CSP module

Comparison to CSP

Datastream has a lot in common with the CSP module. Although they were both designed for I/O processing, there are few important differecnes:

DatastreamCSP
Overhead:Low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performanceNo short-circuit evaluation, overhead is higher
Throughput speed:Very fast (360 880 548 ops/sec)Fast (105 932 203 ops/sec), but slower than Datastream
Optimized for:Small pieces of dataMedium-sized objects, ByteBufs
Programming model:More complicatedSimple and convenient

ActiveJ makes extensive use of combinations of CSP and Datastream for maximum efficiency. To do this, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using these methods and interfaces, you can easily transform channels into other channels or datastreams and vice versa, creating chains of such transformations.

Benchmarks

We measured the performance of Datastream (StreamSupplier streams 100M Integer objects to StreamConsumer scenario) and got the following result:

Time: 2771ms; Average time: 277.1ms; Best time: 275ms; Worst time: 281ms; Operations per second: 360 880 548

We also measured the performance of the TCP server that uses both Datastream and CSP and got an average result of 47 495 905 requests per second.

You can find benchmark sources on GitHub

Examples

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. These examples are located at activej/examples/core/datastream

Simple Supplier

When you run SupplierExample, you'll see the following output:

Consumer received: [0, 1, 2, 3, 4]

This output represents the data that our custom StreamSupplier provided to StreamConsumer. Let's take a look at the implementation:

public final class SupplierExample {
public static void main(String[] args) {

//create an eventloop for streams operations
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
//create a supplier of some numbers
StreamSupplier<Integer> supplier = StreamSuppliers.ofValues(0, 1, 2, 3, 4);
//creating a consumer for our supplier
ToListStreamConsumer<Integer> consumer = ToListStreamConsumer.create();

//streaming supplier's numbers to consumer
supplier.streamTo(consumer);

//when stream completes, streamed data is printed out
consumer.getResult().whenResult(result -> System.out.println("Consumer received: " + result));

//start eventloop
eventloop.run();
}
}

See full example on GitHub

Simple Consumer

When you run ConsumerExample, you'll see the following output:

received: 1
received: 2
received: 3
End of stream received

ConsumerExample represents a custom consumer that extends AbstractStreamConsumer and simply prints the received data. The stream process is managed by the overridden onStarted(), onEndOfStream() and onError() methods:

public final class ConsumerExample<T> extends AbstractStreamConsumer<T> {
@Override
protected void onStarted() {
resume(x -> System.out.println("received: " + x));
}

@Override
protected void onEndOfStream() {
System.out.println("End of stream received");
acknowledge();
}

@Override
protected void onError(Exception t) {
System.out.println("Error handling logic must be here. No confirmation to upstream is needed");
}

See full example on GitHub

Custom Transformer

The Transformer Example shows how to create a custom StreamTransformer that takes strings from the input stream and transforms them to their length, if it is less than the given MAX_LENGTH. First, we define AbstractStreamConsumer and AbstractStreamSupplier:

private final AbstractStreamConsumer<String> input = new AbstractStreamConsumer<>() {
@Override
protected void onEndOfStream() {
output.sendEndOfStream();
}
};

private final AbstractStreamSupplier<Integer> output = new AbstractStreamSupplier<>() {
@Override
protected void onResumed() {
input.resume(item -> {
int len = item.length();
if (len < MAX_LENGTH) {
output.send(len);
}
});
}

@Override
protected void onSuspended() {
input.suspend();
}
};

Now we define the main method that creates the test data supplier, an instance of TransformerExample and ToListStreamConsumer Next, we define the sequence of transformation and output:

public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.withFatalErrorHandler(rethrow())
.build();

StreamSupplier<String> source = StreamSuppliers.ofValues("testdata", "testdata1", "testdata1000");
TransformerExample transformer = new TransformerExample();

source.transformWith(transformer)
.toList()
.whenResult(v -> System.out.println(v));

eventloop.run();
}

If you run the example, you'll receive the following output:

[8, 9]

See full example on GitHub

Built-in Stream Nodes

BuiltinStreamNodesExample demonstrates some simple examples of utilizing built-in datastream nodes.

The first of these is StreamTransformers#filter. It allows you to apply a function to input data and then stream the result of the function to the destination. In this particular example, stream transformer filters the input numbers and then streams only the odd numbers to the consumer .

private static void filter() {
StreamSupplier<Integer> supplier = StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

StreamTransformer<Integer, Integer> filter = StreamTransformers.filter(input -> input % 2 == 1);

ToListStreamConsumer<Integer> consumer = ToListStreamConsumer.create();

supplier.transformWith(filter).streamTo(consumer);

consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is [1, 3, 5, 7, 9], while the graph of streams is pretty simple and looks as follows:

graph TD id1(StreamSuppliers#ofIterator)-->id2(Filter$Input)-->id3([Filter])-->id4(Filter$Output)-->id5(ToListStreamConsumer) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep,stroke-width:2px style id5 stroke:#fa383e,stroke-width:2px

Another built-in stream node is StreamTransformers#mapper.

It changes each input item according to a given function, for example:

private static void mapper() {
//creating a supplier of 10 numbers
StreamSupplier<Integer> supplier = StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

//creating a mapper for the numbers
StreamTransformer<Integer, String> simpleMap = StreamTransformers.mapper(x -> x + " times ten = " + x * 10);

//creating a consumer which converts received values to list
ToListStreamConsumer<String> consumer = ToListStreamConsumer.create();

//applying the mapper to supplier and streaming the result to consumer
supplier.transformWith(simpleMap).streamTo(consumer);

//when consumer completes receiving values, the result is printed out
consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is

[
1 times ten = 10,
2 times ten = 20,
3 times ten = 30,
4 times ten = 40,
5 times ten = 50,
6 times ten = 60,
7 times ten = 70,
8 times ten = 80,
9 times ten = 90,
10 times ten = 100
]

and the graph of streams looks as follows:

graph TD id1(StreamSuppliers#ofIterator)-->id2(Mapper$Input)-->id3([Mapper])-->id4(Mapper$Output)-->id5(ToListStreamConsumer) style id1 stroke:#3578ep,stroke-width:2px style id2 stroke:#fa383e,stroke-width:2px style id3 stroke:#a4a6a8,stroke-width:2px style id4 stroke:#3578ep,stroke-width:2px style id5 stroke:#fa383e,stroke-width:2px

Let's take a look at another built-in node StreamSplitter

This is a stream transformer that distributes input streams according to a given function. In this example it distributes 10 numbers between 3 consumers.

private static void splitter() {
StreamSupplier<Integer> supplier = StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

ToIntFunction<Object> hashSharder = item -> (item.hashCode() & Integer.MAX_VALUE) % 3;
//creating a sharder of three parts for three consumers
StreamSplitter<Integer, Integer> sharder = StreamSplitter.create(
(item, acceptors) -> acceptors[hashSharder.applyAsInt(item)].accept(item));

ToListStreamConsumer<Integer> first = ToListStreamConsumer.create();
ToListStreamConsumer<Integer> second = ToListStreamConsumer.create();
ToListStreamConsumer<Integer> third = ToListStreamConsumer.create();

sharder.newOutput().streamTo(first);
sharder.newOutput().streamTo(second);
sharder.newOutput().streamTo(third);

supplier.streamTo(sharder.getInput());

first.getResult().whenResult(x -> System.out.println("first: " + x));
second.getResult().whenResult(x -> System.out.println("second: " + x));
third.getResult().whenResult(x -> System.out.println("third: " + x));
}
note

In this example we can't use a shortcut method transformWith as it can be used only with the transformers that have exactly one input and one output streams.

The output for this example is

first: [3, 6, 9]
second: [1, 4, 7, 10]
third: [2, 5, 8]`

And the streams graph looks as follows:

graph TD; id7(StreamSuppliers#ofIterator):::blue-->id8(StreamSplitter$Input); id8-->StreamSplitter:::grey; StreamSplitter-->id1(StreamSplitter$Output); StreamSplitter-->id2(StreamSplitter$Output); StreamSplitter-->id3(StreamSplitter$Output); id1-->id4(ToListStreamConsumer); id2-->id5(ToListStreamConsumer); id3-->id6(ToListStreamConsumer);

When creating a StreamSplitter, you can specify any function in the lambda. For example, to create a transformer that sends all input data to all consumers, just create the following lambda expression:

(item, acceptors) -> {for (StreamDataAcceptor<Integer> acceptor : acceptors) { acceptor.accept(item);}}

In this case the output will be

first: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
second: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
third: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Finally, let's take a look at StreamUnion It works as the opposite of StreamSplitter and combines several input streams into one output:

private static void union() {
//creating three suppliers of numbers
StreamSupplier<Integer> source0 = StreamSuppliers.ofValues(1, 2);
StreamSupplier<Integer> source1 = StreamSuppliers.empty();
StreamSupplier<Integer> source2 = StreamSuppliers.ofValues(3, 4, 5);

//creating a unifying transformer
StreamUnion<Integer> streamUnion = StreamUnion.create();

//creating a consumer which converts received values to list
ToListStreamConsumer<Integer> consumer = ToListStreamConsumer.create();

//stream the sources into new inputs of the unifier
source0.streamTo(streamUnion.newInput());
source1.streamTo(streamUnion.newInput());
source2.streamTo(streamUnion.newInput());

//and stream the output of the unifier into the consumer
streamUnion.getOutput().streamTo(consumer);

//when consumer completes receiving values, the result is printed out
consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is [1, 2, 3, 4, 5] and the graph of streams looks as follows:

graph TD; id1(StreamSuppliers#ofIterator)-->id2(StreamUnion$Input); id3(StreamSuppliers#ofIterator)-->id4(StreamUnion$Input); id5(StreamSuppliers#ofIterator)-->id6(StreamUnion$Input); id2-->StreamUnion; id4-->StreamUnion; id6-->StreamUnion; StreamUnion-->StreamUnion$Output; StreamUnion$Output-->StreamConsumer; StreamUnion$Output-->ToListStreamConsumer;

See full example on GitHub