Skip to main content

CSP

Overview

CSP (stands for Communicating Sequential Process) provides serial I/O communication between reactive data suppliers and consumers. It is used for asynchronous data streaming. CSP was inspired by the Go language channels.

Features:

  • High performance and throughput
  • Optimized to work with objects of medium size (like ByteBufs)
  • CSP has reach DSL, which provides a simple programming model
  • Has asynchronous back pressure control

Channel Supplier and Channel Consumer

The CSP communication is conducted via ChannelSupplier and ChannelConsumer, which provide and receive some data, respectively. Each subsequent request to these channels must only be invoked after the previous request is complete. The CSP uses promises to manage this.

ChannelSupplier has a get() method that returns a Promise of the provided value. Until this Promise completes with either a result or an exception, the get() method should not be called again. Also note that if get() returns a Promise of null, this indicates the end of the stream and no additional data should be requested from that supplier.

ChannelConsumer has an accept(@Nullable T value) method that returns a Promise of null as a marker of completion of the accepting. Until this Promise completes, the accept() method should not be called again. Similar to ChannelSupplier, if a null value is accepted, it indicates the end of the stream.

Here is an example of communication between ChannelSupplier and ChannelConsumer:

protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';

output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}

Channel Queue

Another important CSP concept is the ChannelQueue interface and its implementations: ChannelBuffer and ChannelZeroBuffer. They provide a link between Consumers and Suppliers and allow them to create chains of these pipes if needed.

Basically, these buffers transfer objects from ChannelConsumer to ChannelSupplier as soon as there is free space in the queue. This process is controlled by Promises. You can manually set the size of the ChannelBuffer. The ChannelZeroBuffer does not store any values, but simply transfers them one at a time from the ChannelConsumer to the ChannelSupplier.

Here is a simple example of working with buffers:

public void accept(T item) {
buffer.add(item);
if (buffer.isSaturated()) {
getSupplier().suspend();
}
}

void produce() {
while (!buffer.isEmpty()) {
T item = buffer.poll();
if (item != null) {
send(item);
} else {
sendEndOfStream();
}
}
}

Comparison to Datastream

The CSP has a lot in common with the Datastream module. Although they were both designed for I/O processing, there are a few important differences:

DatastreamCSP
Overhead:Low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performanceNo short-circuit evaluation, overhead is higher
Throughput speed:Very fast (360 880 548 ops/sec)Fast (105 932 203 ops/sec), but slower than Datastream
Programming model:More complicatedSimple and convenient

ActiveJ makes extensive use of combinations of CSP and Datastream for maximum efficiency. To do this, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using these methods and interfaces, you can easily transform channels into other channels or datastreams and vice versa, creating chains of such transformations.

Benchmark

We've measured the performance of the CSP (ChannelSupplier transfers 50M Integer objects to ChannelConsumer scenario) and got the following result:

Time: 4720ms; Average time: 472.0ms; Best time: 469ms; Worst time: 475ms; Operations per second: 105 932 203

We also measured the performance of the TCP server that uses both CSP and Datastream and got an average result of 47 495 905 requests per second.

Examples

note

To run the examples, you need to clone ActiveJ from GitHub:

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. These examples are located at activej/examples/core/csp.

Basic Channel Example

The Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:

private static void supplierOfValues() {
ChannelSuppliers.ofValues("1", "2", "3", "4", "5")
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}

private static void supplierOfList(List<String> list) {
ChannelSuppliers.ofList(list)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}

private static void map() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5)
.map(integer -> integer + " times 10 = " + integer * 10)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}

private static void toCollector() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5)
.toCollector(Collectors.toList())
.whenResult(x -> System.out.println(x));
}

private static void filter() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5, 6)
.filter(integer -> integer % 2 == 0)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}

So if you run this example, you will get the following result:

1
2
3
4
5
One
Two
Three
1 times 10 = 10
2 times 10 = 20
3 times 10 = 30
4 times 10 = 40
5 times 10 = 50
[1, 2, 3, 4, 5]
2
4
6

See full example on GitHub

CSP Example

This example represents the ReactiveProcess between ChannelSupplier and ChannelConsumer. In this example ChannelSupplier represents input and ChannelConsumer represents output:

public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {
private ChannelSupplier<String> input;
private ChannelConsumer<String> output;

@Override
public ChannelOutput<String> getOutput() {
return output -> {
this.output = output;
if (this.input != null && this.output != null) startProcess();
};
}

@Override
public ChannelInput<String> getInput() {
return input -> {
this.input = input;
if (this.input != null && this.output != null) startProcess();
return getProcessCompletion();
};
}

@Override
//[START REGION_1]
protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';

output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}
//[END REGION_1]

@Override
protected void doClose(Exception e) {
System.out.println("Process has been closed with exception: " + e);
input.closeEx(e);
output.closeEx(e);
}

public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();

CspExample process = new CspExample();
ChannelSuppliers.ofValues("hello", "world", "nice", "to", "see", "you")
.transformWith(process)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));

eventloop.run();
}
}

This process takes a string, converts it to uppercase, and adds the length of the string in parentheses:

HELLO(5)
WORLD(5)
NICE(4)
TO(2)
SEE(3)
YOU(3)

See this example on GitHub

Channel Buffer Example

As mentioned above, there are two main implementations of ChannelQueue: ChannelBuffer and ChannelZeroBuffer, both of which control communication between Consumers and Suppliers. You can manually set the size of the ChannelBuffer, while the size of the ChannelZeroBuffer is always 0.

To understand how all these Buffers work, let's look at a simple example. Suppose there is a Granny who wants to give her Grandson 25 Apples. That is quite a lot, so first she puts the Apples on a large Plate on which she can put up to 10 apples at a time. When the Plate is full, the Grandson has to take at least one apple first, and only then can the Granny put another Apple on the Plate:

static final class ChannelBufferStream {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();

ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);
ChannelSupplier<Integer> granny = plate.getSupplier();
Promises.loop(0,
apple -> apple < 25,
apple -> plate.put(apple).map($ -> {
System.out.println("Granny gives apple #" + apple);
return apple + 1;
}));
granny.streamTo(ChannelConsumers.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));
eventloop.run();
}
}

The next day Granny wants to give her Grandson Apples again, but this time there are only 10 Apples. Therefore, no plate is needed: Granny can simply pass the Apples to her Grandson one at a time:

static final class ChannelBufferZeroExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();

ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();
ChannelSupplier<Integer> granny = buffer.getSupplier();

Promises.loop(0,
apple -> apple < 10,
apple -> buffer.put(apple).map($ -> {
System.out.println("Granny gives apple #" + apple);
return apple + 1;
}));

granny.streamTo(ChannelConsumers.<Integer>ofConsumer(apple ->
System.out.println("Grandson takes apple #" + apple)).async());

eventloop.run();
}
}

See this example on GitHub

ChannelSplitter Example

In this example we use a predefined ChannelSplitter. The splitter allows you to split data from one input to multiple outputs. In our case the output will be split into three ChannelConsumers:

public class SplitterExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
List<Integer> integers = Stream.iterate(1, i -> i + 1)
.limit(5)
.collect(Collectors.toList());

ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSuppliers.ofList(integers));

List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new ArrayList<>();

splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list1::add)));
splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list2::add)));
splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list3::add)));

eventloop.run();

System.out.println("First list: " + list1);
System.out.println("Second list: " + list2);
System.out.println("Third list: " + list3);
}
}

See this example on GitHub

ByteBufs Decoder Example

ByteBufsDecoder allows you to efficiently handle ByteBufs and decode the data stored in them for further processing. In this example BinaryChannelSupplier will supply a String decoded and parsed from a ByteBuf.

public final class ByteBufsDecoderExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop
.builder()
.withCurrentThread()
.build();

List<ByteBuf> letters = List.of(wrapAscii("H"), wrapAscii("e"), wrapAscii("l"), wrapAscii("l"), wrapAscii("o"));
ByteBufsDecoder<String> decoder = bufs -> {
if (!bufs.hasRemainingBytes(5)) {
System.out.println("Not enough bytes to decode message");
return null;
}
return bufs.takeExactSize(5).asString(UTF_8);
};

BinaryChannelSupplier.of(ChannelSuppliers.ofList(letters)).decode(decoder)
.whenResult(x -> System.out.println(x));

eventloop.run();
}
}

Channel File Example

This example demonstrates how to work with files asynchronously using Promises and built-in CSP consumers and suppliers. This example writes two lines to a file using ChannelFileWriter, then reads and prints them using ChannelFileReader

private static Promise<Void> writeToFile() {
return ChannelSuppliers.ofValues(
ByteBufStrings.wrapAscii("Hello, this is example file\n"),
ByteBufStrings.wrapAscii("This is the second line of file\n"))
.streamTo(ChannelFileWriter.open(executor, PATH, WRITE));
}

private static Promise<Void> readFile() {
return ChannelFileReader.open(executor, PATH)
.then(cfr -> cfr.streamTo(ChannelConsumers.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));

}

If you run the example, you will see the contents of the created file:

Hello, this is example file
This is the second line of file

See full example on GitHub