Skip to main content



CSP (stands for Communicating Sequential Process) provides sequential I/O communication between asynchronous data suppliers and consumers. It is used for asynchronous streaming of data. CSP was inspired by the Go language channels.


  • High performance and throughput speed
  • Optimized for working with medium-sized objects (like ByteBufs)
  • CSP has reach DSL, which provides a simple programming model
  • Has an asynchronous back pressure management

Channel Supplier and Channel Consumer

CSP communication is conducted via ChannelSupplier and ChannelConsumer, which provide and accept some data respectively. Each consecutive request to these channels should be called only after the previous request finishes. CSP uses promises to manage it.

ChannelSupplier has a get() method that returns a Promise of the provided value. Until this Promise is completed either with a result or with an exception, the get() method shouldn't be called again. Also note, that if get() returns Promise of null, this represents the end of the stream and no additional data should be requested from this supplier.

ChannelConsumer has an accept(@Nullable T value) method which returns a Promise of null as a marker of completion of the accepting. Until this Promise completes, accept() method should not be called again. By analogy with the ChannelSupplier, if a null value is accepted, it represents the end of the stream.

Here is an example of communication between ChannelSupplier and ChannelConsumer:

protected void doProcess() {  input.get()      .whenResult(data -> {        if (data == null) {          output.acceptEndOfStream()              .whenResult(this::completeProcess);        } else {          data = data.toUpperCase() + '(' + data.length() + ')';
          output.accept(data)              .whenResult(this::doProcess);        }      })      .whenException(Throwable::printStackTrace);}

Channel Queue

Another important concept of CSP is ChannelQueue interface and its implementations: ChannelBuffer and ChannelZeroBuffer. They provide communication between Consumers and Suppliers and allow them to create chains of these pipes if needed.

Basically, these buffers pass objects from ChannelConsumer to ChannelSupplier as soon as the queue gets a free space. This process is controlled by Promises. You can manually set the size of ChannelBuffer. ChannelZeroBuffer doesn’t store any values but simply passes them one by one from ChannelConsumer to ChannelSupplier.

Here is a simple example of working with buffers of items:

public void accept(T item) {    buffer.add(item);    if (buffer.isSaturated()) {        getSupplier().suspend();    }}
void produce() {    while (!buffer.isEmpty()) {        T item = buffer.poll();        if (item != null) {            send(item);        } else {            sendEndOfStream();        }    }}

Comparison to Datastream

CSP has a lot in common with the Datastream module. Although they were both designed for I/O processing, there are several important distinctions:

Overhead:Extremely low: stream can be started with 1 virtual call, short-circuit evaluation optimizes performanceNo short-circuit evaluation, overhead is higher
Throughput speed:Extremely fast (360 880 548 ops/sec)Fast (105 932 203 ops/sec), but slower than Datastream
Programming model:More complicatedSimple and convenient

To provide maximum efficiency, ActiveJ widely utilizes combinations of CSP and Datastream. For this purpose, ChannelSupplier, ChannelConsumer, StreamSupplier and StreamConsumer have transformWith() methods and special Transformer interfaces. Using these methods and interfaces, you can seamlessly transform channels into other channels or datastreams and vice versa, creating chains of such transformations.


We've measured CSP performance (ChannelSupplier streams 50M Integer objects to ChannelConsumer scenario) and received the following result:

Time: 4720ms; Average time: 472.0ms; Best time: 469ms; Worst time: 475ms; Operations per second: 105 932 203

We've also measured TCP server performance that uses both CSP and Datastream and got the average result of 47 495 905 requests per second.



To run the examples, you need to clone ActiveJ from GitHub:

git clone

And import it as a Maven project. Check out tag v5.2. Before running the examples, build the project. These examples are located at activej/examples/core/csp.

Basic Channel Example

Channel Example shows the interaction between suppliers and consumers using streamTo and some helper methods:

private static void supplierOfValues() {  ChannelSupplier.of("1", "2", "3", "4", "5")      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void supplierOfList(List<String> list) {  ChannelSupplier.ofList(list)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void map() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .map(integer -> integer + " times 10 = " + integer * 10)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void toCollector() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .toCollector(Collectors.toList())      .whenResult(x -> System.out.println(x));}
private static void filter() {  ChannelSupplier.of(1, 2, 3, 4, 5, 6)      .filter(integer -> integer % 2 == 0)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}

Thus, if you run this example, you'll receive the following output:

12345OneTwoThree1 times 10 = 102 times 10 = 203 times 10 = 304 times 10 = 405 times 10 = 50[1, 2, 3, 4, 5]246

See full example on GitHub

CSP Example

This example represents an AsyncProcess between ChannelSupplier and ChannelConsumer. In this example ChannelSupplier represents an input and ChannelConsumer - output:

public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {  private ChannelSupplier<String> input;  private ChannelConsumer<String> output;
  @Override  public ChannelOutput<String> getOutput() {    return output -> {      this.output = output;      if (this.input != null && this.output != null) startProcess();    };  }
  @Override  public ChannelInput<String> getInput() {    return input -> {      this.input = input;      if (this.input != null && this.output != null) startProcess();      return getProcessCompletion();    };  }
  @Override  //[START REGION_1]  protected void doProcess() {    input.get()        .whenResult(data -> {          if (data == null) {            output.acceptEndOfStream()                .whenResult(this::completeProcess);          } else {            data = data.toUpperCase() + '(' + data.length() + ')';
            output.accept(data)                .whenResult(this::doProcess);          }        })        .whenException(Throwable::printStackTrace);  }  //[END REGION_1]
  @Override  protected void doClose(Exception e) {    System.out.println("Process has been closed with exception: " + e);    input.closeEx(e);    output.closeEx(e);  }
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    CspExample process = new CspExample();    ChannelSupplier.of("hello", "world", "nice", "to", "see", "you")        .transformWith(process)        .streamTo(ChannelConsumer.ofConsumer(System.out::println));;  }}

This process takes a string, sets it to upper-case and adds string's length in parentheses:


See this example on GitHub

Channel Buffer Example

As it was mentioned before, there are two ChannelQueue implementations: ChannelBuffer and ChannelZeroBuffer, both of them manage communication between Providers and Suppliers. You can manually set the size of ChannelBuffer, whereas ChannelZeroBuffer size is always 0.

To understand how all these Buffers work, let's have a simple example. Assume there is a Granny who wants to give her Grandson 25 Apples. That's quite a lot, so she first puts the Apples on a big Plate, which can place up to 10 apples simultaneously. When the Plate is full, Grandson should first take at least one apple, and only after that Granny can put a new Apple to the Plate:

static final class ChannelBufferStream {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);    ChannelSupplier<Integer> granny = plate.getSupplier();    Promises.loop(0,        apple -> apple < 25,        apple -> plate.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));    granny.streamTo(ChannelConsumer.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));;  }}

On the next day Granny wants to give Apples to her Grandson again, but this time there are only 10 Apples. So there is no real need of the plate: Granny can simply pass the Apples to her Grandson one by one:

static final class ChannelBufferZeroExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();    ChannelSupplier<Integer> granny = buffer.getSupplier();
    Promises.loop(0,        apple -> apple < 10,        apple -> buffer.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));
    granny.streamTo(ChannelConsumer.<Integer>ofConsumer(apple ->        System.out.println("Grandson takes apple #" + apple)).async());;  }}

See this example on GitHub

ChannelSplitter Example

In this example we use predefined ChannelSplitter. Splitter allows to split data from one input to several outputs. In our case output will be split into three ChannelConsumers:

public class SplitterExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();    List<Integer> integers = Stream.iterate(1, i -> i + 1)        .limit(5)        .collect(Collectors.toList());
    ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSupplier.ofList(integers));
    List<Integer> list1 = new ArrayList<>();    List<Integer> list2 = new ArrayList<>();    List<Integer> list3 = new ArrayList<>();
    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list1::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list2::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list3::add)));;
    System.out.println("First list: " + list1);    System.out.println("Second list: " + list2);    System.out.println("Third list: " + list3);  }}

See this example on GitHub

ByteBufs Decoder Example

ByteBufsDecoder allows to efficiently work with ByteBufs and decode data stored in them for further processing. In this example BinaryChannelSupplier will supply a String decoded and parsed from a ByteBuf.

public final class ByteBufsDecoderExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    List<ByteBuf> letters = asList(wrapAscii("H"), wrapAscii("e"), wrapAscii("l"), wrapAscii("l"), wrapAscii("o"));    ByteBufsDecoder<String> decoder = bufs -> {      if (!bufs.hasRemainingBytes(5)) {        System.out.println("Not enough bytes to decode message");        return null;      }      return bufs.takeExactSize(5).asString(UTF_8);    };
    BinaryChannelSupplier.of(ChannelSupplier.ofList(letters)).decode(decoder)        .whenResult(x -> System.out.println(x));;  }}

Channel File Example

This example demonstrates how to work with files asynchronously using Promises and CSP built-in consumers and suppliers. This example writes two lines to the file with ChannelFileWriter, then reads and prints them out utilizing ChannelFileReader

private static @NotNull Promise<Void> writeToFile() {  return ChannelSupplier.of(      ByteBufStrings.wrapAscii("Hello, this is example file\n"),      ByteBufStrings.wrapAscii("This is the second line of file\n"))      .streamTo(, PATH, WRITE));}
private static @NotNull Promise<Void> readFile() {  return, PATH)      .then(cfr -> cfr.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));

If you run the example, you'll see the content of the created file:

Hello, this is example fileThis is the second line of file

See full example on GitHub