CSP
Overview
CSP (stands for Communicating Sequential Process) provides serial I/O communication between reactive data suppliers and consumers. It is used for asynchronous data streaming. CSP was inspired by the Go language channels.
Features:
- High performance and throughput
- Optimized to work with objects of medium size (like
ByteBufs
) - CSP has reach DSL, which provides a simple programming model
- Has asynchronous back pressure control
Channel Supplier and Channel Consumer
The CSP communication is conducted via ChannelSupplier
and ChannelConsumer
, which provide and receive some data, respectively. Each subsequent request to these channels must only be invoked after the previous request is complete. The CSP uses promises
to manage this.
ChannelSupplier
has a get()
method that returns a Promise
of the provided value. Until this Promise
completes with either a result or an exception, the get()
method should not be called again. Also note that if get()
returns a Promise
of null
, this indicates the end of the stream and no additional data should be requested from that supplier.
ChannelConsumer
has an accept(@Nullable T value)
method that returns a Promise
of null
as a marker of completion of the accepting. Until this Promise
completes, the accept()
method should not be called again. Similar to ChannelSupplier
, if a null
value is accepted, it indicates the end of the stream.
Here is an example of communication between ChannelSupplier
and ChannelConsumer
:
protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';
output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}
Channel Queue
Another important CSP concept is the ChannelQueue
interface and its implementations: ChannelBuffer
and ChannelZeroBuffer
. They provide a link between Consumers and Suppliers and allow them to create chains of these pipes if needed.
Basically, these buffers transfer objects from ChannelConsumer
to ChannelSupplier
as soon as there is free space in the queue. This process is controlled by Promise
s. You can manually set the size of the ChannelBuffer
. The ChannelZeroBuffer
does not store any values, but simply transfers them one at a time from the ChannelConsumer
to the ChannelSupplier
.
Here is a simple example of working with buffers:
public void accept(T item) {
buffer.add(item);
if (buffer.isSaturated()) {
getSupplier().suspend();
}
}
void produce() {
while (!buffer.isEmpty()) {
T item = buffer.poll();
if (item != null) {
send(item);
} else {
sendEndOfStream();
}
}
}
Comparison to Datastream
The CSP has a lot in common with the Datastream module. Although they were both designed for I/O processing, there are a few important differences:
Datastream | CSP | |
---|---|---|
Overhead: | Low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance | No short-circuit evaluation, overhead is higher |
Throughput speed: | Very fast (360 880 548 ops/sec) | Fast (105 932 203 ops/sec), but slower than Datastream |
Programming model: | More complicated | Simple and convenient |
ActiveJ makes extensive use of combinations of CSP and Datastream for maximum efficiency. To do this,
ChannelSupplier
, ChannelConsumer
, StreamSupplier
and StreamConsumer
have transformWith()
methods and special
Transformer interfaces. Using these methods and interfaces, you can easily transform channels into other channels or datastreams and vice versa, creating chains of such transformations.
Benchmark
We've measured the performance of the CSP (ChannelSupplier
transfers 50M Integer
objects to ChannelConsumer
scenario) and got the following result:
Time: 4720ms; Average time: 472.0ms; Best time: 469ms; Worst time: 475ms; Operations per second: 105 932 203
We also measured the performance of the TCP server that uses both CSP and Datastream and got an average result of 47 495 905 requests per second.
Examples
- Basic Channel Example
- CSP Example
- Channel Buffer Example
- ChannelSplitter Example
- ByteBufsDecoderExample
- Channel File Example
To run the examples, you need to clone ActiveJ from GitHub:
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. These examples are located at activej/examples/core/csp
.
Basic Channel Example
The Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:
private static void supplierOfValues() {
ChannelSuppliers.ofValues("1", "2", "3", "4", "5")
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}
private static void supplierOfList(List<String> list) {
ChannelSuppliers.ofList(list)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}
private static void map() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5)
.map(integer -> integer + " times 10 = " + integer * 10)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}
private static void toCollector() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5)
.toCollector(Collectors.toList())
.whenResult(x -> System.out.println(x));
}
private static void filter() {
ChannelSuppliers.ofValues(1, 2, 3, 4, 5, 6)
.filter(integer -> integer % 2 == 0)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
}
So if you run this example, you will get the following result:
1
2
3
4
5
One
Two
Three
1 times 10 = 10
2 times 10 = 20
3 times 10 = 30
4 times 10 = 40
5 times 10 = 50
[1, 2, 3, 4, 5]
2
4
6
CSP Example
This example represents the ReactiveProcess
between ChannelSupplier
and ChannelConsumer
. In this example ChannelSupplier
represents input and ChannelConsumer
represents output:
public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {
private ChannelSupplier<String> input;
private ChannelConsumer<String> output;
@Override
public ChannelOutput<String> getOutput() {
return output -> {
this.output = output;
if (this.input != null && this.output != null) startProcess();
};
}
@Override
public ChannelInput<String> getInput() {
return input -> {
this.input = input;
if (this.input != null && this.output != null) startProcess();
return getProcessCompletion();
};
}
@Override
//[START REGION_1]
protected void doProcess() {
input.get()
.whenResult(data -> {
if (data == null) {
output.acceptEndOfStream()
.whenResult(this::completeProcess);
} else {
data = data.toUpperCase() + '(' + data.length() + ')';
output.accept(data)
.whenResult(this::doProcess);
}
})
.whenException(Throwable::printStackTrace);
}
//[END REGION_1]
@Override
protected void doClose(Exception e) {
System.out.println("Process has been closed with exception: " + e);
input.closeEx(e);
output.closeEx(e);
}
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
CspExample process = new CspExample();
ChannelSuppliers.ofValues("hello", "world", "nice", "to", "see", "you")
.transformWith(process)
.streamTo(ChannelConsumers.ofConsumer(System.out::println));
eventloop.run();
}
}
This process takes a string, converts it to uppercase, and adds the length of the string in parentheses:
HELLO(5)
WORLD(5)
NICE(4)
TO(2)
SEE(3)
YOU(3)
Channel Buffer Example
As mentioned above, there are two main implementations of ChannelQueue
: ChannelBuffer
and ChannelZeroBuffer
,
both of which control communication between Consumers and Suppliers.
You can manually set the size of the ChannelBuffer
, while the size of the ChannelZeroBuffer
is always 0.
To understand how all these Buffers work, let's look at a simple example. Suppose there is a Granny who wants to give her Grandson 25 Apples. That is quite a lot, so first she puts the Apples on a large Plate on which she can put up to 10 apples at a time. When the Plate is full, the Grandson has to take at least one apple first, and only then can the Granny put another Apple on the Plate:
static final class ChannelBufferStream {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);
ChannelSupplier<Integer> granny = plate.getSupplier();
Promises.loop(0,
apple -> apple < 25,
apple -> plate.put(apple).map($ -> {
System.out.println("Granny gives apple #" + apple);
return apple + 1;
}));
granny.streamTo(ChannelConsumers.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));
eventloop.run();
}
}
The next day Granny wants to give her Grandson Apples again, but this time there are only 10 Apples. Therefore, no plate is needed: Granny can simply pass the Apples to her Grandson one at a time:
static final class ChannelBufferZeroExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();
ChannelSupplier<Integer> granny = buffer.getSupplier();
Promises.loop(0,
apple -> apple < 10,
apple -> buffer.put(apple).map($ -> {
System.out.println("Granny gives apple #" + apple);
return apple + 1;
}));
granny.streamTo(ChannelConsumers.<Integer>ofConsumer(apple ->
System.out.println("Grandson takes apple #" + apple)).async());
eventloop.run();
}
}
ChannelSplitter Example
In this example we use a predefined ChannelSplitter
.
The splitter allows you to split data from one input to multiple outputs. In our case the output will be split into three ChannelConsumers
:
public class SplitterExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
List<Integer> integers = Stream.iterate(1, i -> i + 1)
.limit(5)
.collect(Collectors.toList());
ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSuppliers.ofList(integers));
List<Integer> list1 = new ArrayList<>();
List<Integer> list2 = new ArrayList<>();
List<Integer> list3 = new ArrayList<>();
splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list1::add)));
splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list2::add)));
splitter.addOutput().set(ChannelConsumers.ofAsyncConsumer(AsyncConsumer.of(list3::add)));
eventloop.run();
System.out.println("First list: " + list1);
System.out.println("Second list: " + list2);
System.out.println("Third list: " + list3);
}
}
ByteBufs Decoder Example
ByteBufsDecoder
allows you to efficiently handle ByteBufs
and decode the data stored in them for further processing. In this example BinaryChannelSupplier
will supply a String decoded and parsed from a ByteBuf.
public final class ByteBufsDecoderExample {
public static void main(String[] args) {
Eventloop eventloop = Eventloop
.builder()
.withCurrentThread()
.build();
List<ByteBuf> letters = List.of(wrapAscii("H"), wrapAscii("e"), wrapAscii("l"), wrapAscii("l"), wrapAscii("o"));
ByteBufsDecoder<String> decoder = bufs -> {
if (!bufs.hasRemainingBytes(5)) {
System.out.println("Not enough bytes to decode message");
return null;
}
return bufs.takeExactSize(5).asString(UTF_8);
};
BinaryChannelSupplier.of(ChannelSuppliers.ofList(letters)).decode(decoder)
.whenResult(x -> System.out.println(x));
eventloop.run();
}
}
Channel File Example
This example demonstrates how to work with files asynchronously using Promises and built-in CSP consumers and suppliers. This example writes two lines to a file using ChannelFileWriter
, then reads and prints them using ChannelFileReader
private static Promise<Void> writeToFile() {
return ChannelSuppliers.ofValues(
ByteBufStrings.wrapAscii("Hello, this is example file\n"),
ByteBufStrings.wrapAscii("This is the second line of file\n"))
.streamTo(ChannelFileWriter.open(executor, PATH, WRITE));
}
private static Promise<Void> readFile() {
return ChannelFileReader.open(executor, PATH)
.then(cfr -> cfr.streamTo(ChannelConsumers.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));
}
If you run the example, you will see the contents of the created file:
Hello, this is example file
This is the second line of file