CSP (stands for Communicating Sequential Process) provides sequential I/O communication between asynchronous data suppliers and consumers. It is used for asynchronous streaming of data. CSP was inspired by the Go language channels.
- High performance and throughput speed
- Optimized for working with medium-sized objects (like
- CSP has reach DSL, which provides a simple programming model
- Has an asynchronous back pressure management
CSP communication is conducted via
ChannelConsumer, which provide and accept some data respectively. Each consecutive request to these channels should be called only after the previous request finishes. CSP uses
promises to manage it.
ChannelSupplier has a
get() method that returns a
Promise of the provided value. Until this
Promise is completed either with a result or with an exception, the
get() method shouldn't be called again. Also note, that if
null, this represents the end of the stream and no additional data should be requested from this supplier.
ChannelConsumer has an
accept(@Nullable T value) method which returns a
null as a marker of completion of the accepting. Until this
accept() method should not be called again. By analogy with the
ChannelSupplier, if a
null value is accepted, it represents the end of the stream.
Here is an example of communication between
Another important concept of CSP is
ChannelQueue interface and its implementations:
ChannelZeroBuffer. They provide communication between Consumers and Suppliers and allow them to create chains of these pipes if needed.
Basically, these buffers pass objects from
ChannelSupplier as soon as the queue gets a free space. This process is controlled by
Promises. You can manually set the size of
ChannelZeroBuffer doesn’t store any values but simply passes them one by one from
Here is a simple example of working with buffers of items:
CSP has a lot in common with the Datastream module. Although they were both designed for I/O processing, there are several important distinctions:
|Overhead:||Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance||No short-circuit evaluation, overhead is higher|
|Throughput speed:||Extremely fast (360 880 548 ops/sec)||Fast (105 932 203 ops/sec), but slower than Datastream|
|Programming model:||More complicated||Simple and convenient|
To provide maximum efficiency, ActiveJ widely utilizes combinations of CSP and Datastream. For this purpose,
transformWith() methods and special
Transformer interfaces. Using these methods and interfaces, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.
We've measured CSP performance (
ChannelSupplier streams 50M
Integer objects to
ChannelConsumer scenario) and received the following result:
We've also measured TCP server performance that uses both CSP and Datastream and got the average result of 47 495 905 requests per second.
- Basic Channel Example
- CSP Example
- Channel Buffer Example
- ChannelSplitter Example
- Channel File Example
To run the examples, you need to clone ActiveJ from GitHub:
And import it as a Maven project. Check out tag v4.3. Before running the examples, build the project. These examples are located at
Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:
Thus, if you run this example, you'll receive the following output:
This example represents an
ChannelConsumer. In this example
ChannelSupplier represents an input and
ChannelConsumer - output:
This process takes a string, sets it to upper-case and adds string's length in parentheses:
As it was mentioned before, there are two
both of them manage communication between Providers and Suppliers.
You can manually set the size of
ChannelZeroBuffer size is always 0.
To understand how all these Buffers work, let's have a simple example. Assume there is a Granny who wants to give her Grandson 25 Apples. That's quite a lot, so she first puts the Apples on a big Plate, which can place up to 10 apples simultaneously. When the Plate is full, Grandson should first take at least one apple, and only after that Granny can put a new Apple to the Plate:
On the next day Granny wants to give Apples to her Grandson again, but this time there are only 10 Apples. So there is no real need of the plate: Granny can simply pass the Apples to her Grandson one by one:
In this example we use predefined
Splitter allows to split data from one input to several outputs. In our case output will be split into three
ByteBufsDecoder allows to efficiently work with
ByteBufs and decode data stored in them for further processing. In this example
BinaryChannelSupplier will supply a String decoded and parsed from a ByteBuf.
This example demonstrates how to work with files asynchronously using Promises and CSP built-in consumers and suppliers. This example writes two lines to the file with
ChannelFileWriter, then reads and prints them out utilizing
If you run the example, you'll see the content of the created file: