CSP (stands for Communicating Sequential Process) provides I/O communication between channels and was inspired by the Go
High performance and throughput speed
Optimized for working with medium-sized objects (like ByteBufs)
CSP has reach DSL, which provides a simple programming model
Has an asynchronous back pressure management
Channel Supplier and Channel Consumer
CSP communication is conducted with ChannelSupplier
and ChannelConsumer, which provide and accept some data
respectively. Each consecutive request to these channels should be called only after the previous request finishes, and
Promises are utilized to manage it.
ChannelSupplier has a get() method that returns a Promise of the provided value. Until
this Promise is completed either with a result or with an exception, the method shouldn’t be called again. Also
note, that if get() returns Promise of null, this represents the end of the stream and no additional data
should be requested from this supplier.
ChannelConsumer has an accept(@Nullable T value) method which returns a Promise of null as a marker of
completion of the accepting. Until this Promise is completed, accept() method should not be called again. By analogy
with the ChannelSupplier, if a null value is accepted, it represents the end of the stream.
Here is an example of communication between ChannelSupplier and ChannelConsumer:
Another important concept of CSP is ChannelQueue
interface and its implementations: ChannelBuffer
and ChannelZeroBuffer. They provide communication between Consumers and Suppliers and allow them to create chains of these
pipes if needed.
Basically, these buffers pass objects which were consumed by ChannelConsumer to ChannelSupplier as soon as the
queue gets a free space. This process is controlled by Promises. You can manually set the size for ChannelBuffer.
ChannelZeroBuffer doesn’t store any values but simply passes them one by one from ChannelConsumer to
Here is a simple example of working with buffers of items:
Comparison to Datastream
CSP has a lot in common with the Datastream module.
Although they were both designed for I/O processing, there are several important distinctions:
Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performance
No short-circuit evaluation, overhead is higher
Extremely fast (360880548 ops/sec)
Fast (105932203 ops/sec), but slower than Datastream
Simple and convenient
To provide maximum efficiency, ActiveJ widely utilizes combinations of CSP and Datastream. For this purpose,
ChannelSupplier, ChannelConsumer, StreamSupplier
and StreamConsumer have transformWith() methods and special
Transformer interfaces. Using them, you can seamlessly transform channels into other channels or datastreams and vice
versa, creating chains of such transformations.
We’ve measured CSP performance (ChannelSupplier streams 50M Integer objects to ChannelConsumer scenario)
and received the following result:
We’ve also measured TCP server performance that uses both CSP and Datastream and got the average result of 47495905 requests per second.
To run the examples, you need to clone ActiveJ from GitHub:
$ git clone https://github.com/activej/activej And import it as a Maven project. Check out branch master. Before running the examples, build the project.
These examples are located at activej -> examples -> core -> csp.
Basic Channel Example
Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:
Thus, if you run this example, you’ll receive the following output:
As it was mentioned before, there are two ChannelQueue implementations: ChannelBuffer and ChannelZeroBuffer,
both of them manage communication between Providers and Suppliers.
You can manually set the size of ChannelBuffer, whereas ChannelZeroBuffer size is always 0.
To understand how all these Buffers work, let’s have a simple example. Assume there is a Granny
who wants to give her Grandson 25 Apples. That’s quite a lot, so she first puts the Apples on a big Plate,
which can place up to 10 apples simultaneously. When the Plate is full, Grandson should first take at least one apple,
and only after that Granny can put a new Apple to the Plate:
On the next day Granny wants to give Apples to her Grandson again, but this time there are only 10
Apples. So there is no real need of the plate: Granny can simply pass the Apples to her Grandson
one by one:
You can create chains of transformations of data that is provided by ChannelSupplier. Use transformWith method
and predefined CSP chunkers, compressors, decompressors, etc. In this example we will transform supplier’s ByteBufs,
chunk, compress and decompress them:
This example demonstrates how to work with files asynchronously using Promises and CSP built-in
consumers and suppliers. This example writes two lines to the file with ChannelFileWriter, then reads and prints
them out utilizing ChannelFileReader:
If you run the example, you’ll see the content of the created file: