Datastream

Overview

Datastream is an extremely fast implementation of reactive streams. It is useful for intra and inter-server communication and asynchronous data processing.

Datastream is:

  • A modern implementation of async reactive streams (unlike streams in Java 8 and traditional thread-based blocking streams)
  • Asynchronous with extremely efficient back-pressure control, to handle a natural imbalance in the speed of data sources
  • Composable stream operations (mappers, reducers, filters, sorters, mergers/splitters, compression, serialization)
  • Stream-based network and file I/O on the top of Eventloop module
  • Compatibility with CSP module

Datastream has a lot in common with CSP module. Although they were both designed for I/O processing, there are several important distinctions:

  Datastream CSP
Overhead: Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance No short-circuit evaluation, overhead is higher
Throughput speed: Extremely fast (360 880 548 ops/sec) Fast (105 932 203 ops/sec), but slower than Datastream
Optimized for: Small pieces of data Medium-sized objects, ByteBufs
Programming model: More complicated Simple and convenient

To provide maximum efficiency, ActiveJ widely utilizes combinations of CSP and Datastream. For this purpose, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.

Benchmarks

We’ve measured Datastream performance (StreamSupplier streams 100M Integer objects to StreamConsumer scenario) and received the following result:

Time: 2771ms; Average time: 277.1ms; Best time: 275ms; Worst time: 281ms; Operations per second: 360 880 548

We’ve also measured TCP server performance that uses both Datastream and CSP and got the average result of 47 495 905 requests per second.

You can find benchmark sources on GitHub

Examples

Note: To run the examples, you need to clone ActiveJ from GitHub:
$ git clone https://github.com/activej/activej
And import it as a Maven project. Check out branch master. Before running the examples, build the project.
These examples are located at activej -> examples -> core -> datastream.

Simple Supplier

When you run SupplierExample, you’ll see the following output:

Consumer received: [0, 1, 2, 3, 4]

This output represents the data that our custom StreamSupplier provided to StreamConsumer. Let’s have a look at the implementation:

public final class SupplierExample {
	public static void main(String[] args) {

		//create an eventloop for streams operations
		Eventloop eventloop = Eventloop.create().withCurrentThread();
		//create a supplier of some numbers
		StreamSupplier<Integer> supplier = StreamSupplier.of(0, 1, 2, 3, 4);
		//creating a consumer for our supplier
		StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

		//streaming supplier's numbers to consumer
		supplier.streamTo(consumer);

		//when stream completes, streamed data is printed out
		consumer.getResult().whenResult(result -> System.out.println("Consumer received: " + result));

		//start eventloop
		eventloop.run();
	}
}

See full example sources on GitHub

Simple Consumer

When you run ConsumerExample, you’ll see the following output:

received: 1
received: 2
received: 3
End of stream received

ConsumerExample represents a custom consumer that extends AbstractStreamConsumer and just prints out received data. The stream process is managed with overridden methods onStarted(), onEndOfStream() and onError():

public final class ConsumerExample<T> extends AbstractStreamConsumer<T> {
	@Override
	protected void onStarted() {
		resume(x -> System.out.println("received: " + x));
	}

	@Override
	protected void onEndOfStream() {
		System.out.println("End of stream received");
		acknowledge();
	}

	@Override
	protected void onError(Throwable t) {
		System.out.println("Error handling logic must be here. No confirmation to upstream is needed");
	}

See full example sources on GitHub

Custom Transformer

TransformerExample shows how to create a custom StreamTransformer that takes strings from input stream and transforms them to their length if it is less than the defined MAX_LENGTH. First, we define AbstractStreamConsumer and AbstractStreamSupplier:

private final AbstractStreamConsumer<String> input = new AbstractStreamConsumer<String>() {
	@Override
	protected void onEndOfStream() {
		output.sendEndOfStream();
	}
};

private final AbstractStreamSupplier<Integer> output = new AbstractStreamSupplier<Integer>() {
	@Override
	protected void onResumed() {
		input.resume(item -> {
			int len = item.length();
			if (len < MAX_LENGTH) {
				output.send(len);
			}
		});
	}

	@Override
	protected void onSuspended() {
		input.suspend();
	}
};

Now we define the main method, which creates a supplier of test data, an instance of TransformerExample and StreamConsumerToList. Next, we define the sequence of transformation and output:

public static void main(String[] args) {
	Eventloop eventloop = Eventloop.create().withCurrentThread().withFatalErrorHandler(rethrowOnAnyError());

	StreamSupplier<String> source = StreamSupplier.of("testdata", "testdata1", "testdata1000");
	TransformerExample transformer = new TransformerExample();
	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

	source.transformWith(transformer).streamTo(consumer);
	consumer.getResult().whenResult(v -> System.out.println(v));

	eventloop.run();
}

If you run the example, you’ll receive the following output:

[8, 9]

See full example sources on GitHub

Built-in Stream Nodes

BuiltinStreamNodesExample demonstrates some simple examples of utilizing built-in datastream nodes.

The first one is StreamFilter. It allows to apply a function to the input and then stream the function’s result to the destination. In this particular example StreamFilter filters input numbers and then streams to consumer only odd numbers.

private static void filter() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	StreamFilter<Integer> filter = StreamFilter.create(input -> input % 2 == 1);

	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

	supplier.transformWith(filter).streamTo(consumer);

	consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is [1, 3, 5, 7, 9], while the graph of streams is pretty simple and looks as follows:

Another built-in stream node is StreamMapper. It changes each input item according to the given function, for example:

private static void mapper() {
	//creating a supplier of 10 numbers
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	//creating a mapper for the numbers
	StreamMapper<Integer, String> simpleMap = StreamMapper.create(x -> x + " times ten = " + x * 10);

	//creating a consumer which converts received values to list
	StreamConsumerToList<String> consumer = StreamConsumerToList.create();

	//applying the mapper to supplier and streaming the result to consumer
	supplier.transformWith(simpleMap).streamTo(consumer);

	//when consumer completes receiving values, the result is printed out
	consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is [1 times ten = 10, 2 times ten = 20, 3 times ten = 30, 4 times ten = 40, 5 times ten = 50, 6 times ten = 60, 7 times ten = 70, 8 times ten = 80, 9 times ten = 90, 10 times ten = 100], and the graph of streams looks as follows:

Let’s take a look at another built-in node StreamSplitter. It’s a stream transformer that distributes input streams according to the provided function. In this example it distributes 10 numbers between 3 consumers.

private static void splitter() {
	StreamSupplier<Integer> supplier = StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

	ToIntFunction<Object> hashSharder = item -> (item.hashCode() & Integer.MAX_VALUE) % 3;
	//creating a sharder of three parts for three consumers
	StreamSplitter<Integer, Integer> sharder = StreamSplitter.create(
			(item, acceptors) -> acceptors[hashSharder.applyAsInt(item)].accept(item));

	StreamConsumerToList<Integer> first = StreamConsumerToList.create();
	StreamConsumerToList<Integer> second = StreamConsumerToList.create();
	StreamConsumerToList<Integer> third = StreamConsumerToList.create();

	sharder.newOutput().streamTo(first);
	sharder.newOutput().streamTo(second);
	sharder.newOutput().streamTo(third);

	supplier.streamTo(sharder.getInput());

	first.getResult().whenResult(x -> System.out.println("first: " + x));
	second.getResult().whenResult(x -> System.out.println("second: " + x));
	third.getResult().whenResult(x -> System.out.println("third: " + x));
}
Note: In this example we can't use a shortcut method transformWith as it can be used only with the transformers that have exactly one input and one output streams.

The output for this example is first: [3, 6, 9] second: [1, 4, 7, 10] third: [2, 5, 8]

And the streams graph looks as follows:

You can provide any function in the lambda when you create StreamSplitter. For example, to create a transformer that will send all input data to all the consumers, simply create the following lambda expression:

(item, acceptors) -> {for (StreamDataAcceptor<Integer> acceptor : acceptors) { acceptor.accept(item);}}

In this case the output will be first: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] second: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] third: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Finally, let’s take a look at the StreamUnion. It works as an opposite to the StreamSplitter and unifies several input streams into one output:

private static void union() {
	//creating three suppliers of numbers
	StreamSupplier<Integer> source0 = StreamSupplier.of(1, 2);
	StreamSupplier<Integer> source1 = StreamSupplier.of();
	StreamSupplier<Integer> source2 = StreamSupplier.of(3, 4, 5);

	//creating a unifying transformer
	StreamUnion<Integer> streamUnion = StreamUnion.create();

	//creating a consumer which converts received values to list
	StreamConsumerToList<Integer> consumer = StreamConsumerToList.create();

	//stream the sources into new inputs of the unifier
	source0.streamTo(streamUnion.newInput());
	source1.streamTo(streamUnion.newInput());
	source2.streamTo(streamUnion.newInput());

	//and stream the output of the unifier into the consumer
	streamUnion.getOutput().streamTo(consumer);

	//when consumer completes receiving values, the result is printed out
	consumer.getResult().whenResult(v -> System.out.println(v));
}

The output for this example is [1, 2, 3, 4, 5] and the graph of streams looks as follows:

See full example sources on GitHub