Перейти к основному содержанию

CSP

Обзор

CSP (расшифровывается как Communicating Sequential Process) обеспечивает последовательное взаимодействие ввода-вывода между асинхронными поставщиками и потребителями данных. Он используется для асинхронной потоковой передачи данных. CSP был вдохновлен каналами языка Go.

Характеристики:

  • Высокая производительность и пропускная способность
  • Оптимизирован для работы с объектами среднего размера (например, ByteBufs).
  • В CSP есть DSL, который обеспечивает простую модель программирования
  • Имеет асинхронное управление back pressure

Channel Supplier и Channel Consumer

Связь с CSP осуществляется через ChannelSupplier и ChannelConsumer, которые предоставляют и принимают некоторые данные соответственно. Каждый последующий запрос к этим каналам должен вызываться только после завершения предыдущего запроса. Для управления CSP использует promises.

ChannelSupplier имеет метод get() , который возвращает Promise предоставленного значения. Пока этот Promise не завершится либо с результатом, либо с исключением, метод get() не должен вызываться снова. Также обратите внимание, что если get() возвращает Promise из null, то это означает конец потока и никаких дополнительных данных у этого поставщика запрашивать не следует.

ChannelConsumer имеет метод accept(@Nullable T value) , который возвращает Promise из null в качестве маркера завершения приема. Пока этот Promise не завершится, метод accept() не должен вызываться снова. По аналогии с ChannelSupplier, если принимается значение null , оно представляет собой конец потока.

Вот пример коммуникации между ChannelSupplier и ChannelConsumer:

protected void doProcess() {  input.get()      .whenResult(data -> {        if (data == null) {          output.acceptEndOfStream()              .whenResult(this::completeProcess);        } else {          data = data.toUpperCase() + '(' + data.length() + ')';
          output.accept(data)              .whenResult(this::doProcess);        }      })      .whenException(Throwable::printStackTrace);}

Очередь каналов

Другой важной концепцией CSP является ChannelQueue интерфейс и его реализации: ChannelBuffer и ChannelZeroBuffer. Они обеспечивают связь между Consumers и Suppliers и позволяют при необходимости создавать цепочки пайпов. В основном, эти буферы передают объекты от ChannelConsumer к ChannelSupplier , как только в очереди появляется свободное место. Этот процесс контролируется Promises. Вы можете вручную установить размер ChannelBuffer. ChannelZeroBuffer не хранит никаких значений, а просто передает их по одному от ChannelConsumer к ChannelSupplier. Вот простой пример работы с буферами элементов:

public void accept(T item) {    buffer.add(item);    if (buffer.isSaturated()) {        getSupplier().suspend();    }}
void produce() {    while (!buffer.isEmpty()) {        T item = buffer.poll();        if (item != null) {            send(item);        } else {            sendEndOfStream();        }    }}

Сравнение с Datastream

CSP имеет много общего с модулем Datastream . Хотя они оба были разработаны для обработки ввода-вывода, существует несколько важных различий:

DatastreamCSP
Overhead:Небольшой: поток может быть запущен с 1 виртуального вызова, оценка короткого замыкания оптимизирует производительностьНет оценки короткого замыкания, overhead выше
Скорость пропускной способности:Чрезвычайно быстро (360 880 548 операций в секунду)Быстро (105 932 203 оп/с), но медленнее, чем Datastream.
Модель программирования:Более сложнаяПростая и удобная

Для обеспечения максимальной эффективности ActiveJ широко использует комбинации CSP и Datastream. Для этого ChannelSupplier, ChannelConsumer, StreamSupplier и StreamConsumer имеют методы transformWith() и специальные интерфейсы Transformer. Используя эти методы и интерфейсы, вы можете легко преобразовывать каналы в другие каналы или потоки данных и наоборот, создавая цепочки таких преобразований.

Производительность

Мы измерили производительность CSP (ChannelSupplier стримил 50M Integer объектов в ChannelConsumer) и получили следующий результат:

Время: 4720 мс; Среднее время: 472.0ms; Лучшее время: 469 мс; Худшее время: 475 мс; Операции в секунду: 105 932 203

Мы также измерили производительность TCP-сервера, использующего как CSP, так и Datastream, и получили средний результат 47 495 905 запросов в секунду.

Примеры

note

Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub:

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите тег v5.4.3. Перед запуском примеров выполните сборку проекта. Эти примеры расположены по адресу activej/examples/core/csp.

Пример Basic Channel

Пример канала показывает взаимодействие между поставщиками и потребителями, используя streamTo и некоторые вспомогательные методы:

private static void supplierOfValues() {  ChannelSupplier.of("1", "2", "3", "4", "5")      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void supplierOfList(List<String> list) {  ChannelSupplier.ofList(list)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void map() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .map(integer -> integer + " times 10 = " + integer * 10)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void toCollector() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .toCollector(Collectors.toList())      .whenResult(x -> System.out.println(x));}
private static void filter() {  ChannelSupplier.of(1, 2, 3, 4, 5, 6)      .filter(integer -> integer % 2 == 0)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}

Таким образом, если вы запустите этот пример, вы получите следующий результат:

12345OneTwoThree1 times 10 = 102 times 10 = 203 times 10 = 304 times 10 = 405 times 10 = 50[1, 2, 3, 4, 5]246

Полный текст примера смотрите на GitHub.

Пример CSP

Данный пример представляет собой AsyncProcess между ChannelSupplier и ChannelConsumer. В данном примере ChannelSupplier представляет собой вход, а ChannelConsumer - выход:

public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {  private ChannelSupplier<String> input;  private ChannelConsumer<String> output;
  @Override  public ChannelOutput<String> getOutput() {    return output -> {      this.output = output;      if (this.input != null && this.output != null) startProcess();    };  }
  @Override  public ChannelInput<String> getInput() {    return input -> {      this.input = input;      if (this.input != null && this.output != null) startProcess();      return getProcessCompletion();    };  }
  @Override  //[START REGION_1]  protected void doProcess() {    input.get()        .whenResult(data -> {          if (data == null) {            output.acceptEndOfStream()                .whenResult(this::completeProcess);          } else {            data = data.toUpperCase() + '(' + data.length() + ')';
            output.accept(data)                .whenResult(this::doProcess);          }        })        .whenException(Throwable::printStackTrace);  }  //[END REGION_1]
  @Override  protected void doClose(Exception e) {    System.out.println("Process has been closed with exception: " + e);    input.closeEx(e);    output.closeEx(e);  }
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    CspExample process = new CspExample();    ChannelSupplier.of("hello", "world", "nice", "to", "see", "you")        .transformWith(process)        .streamTo(ChannelConsumer.ofConsumer(System.out::println));
    eventloop.run();  }}

Этот процесс принимает строку, переводит ее в верхний регистр и добавляет длину строки в круглых скобках:

HELLO(5)WORLD(5)NICE(4)TO(2)SEE(3)YOU(3)

Смотрите этот пример на GitHub.

Пример Channel Buffer

Как уже было сказано ранее, существует две реализации ChannelQueue: ChannelBuffer и ChannelZeroBuffer, обе они управляют коммуникацией между Providers и Suppliers. Вы можете вручную установить размер ChannelBuffer, в то время как ChannelZeroBuffer размер всегда равен 0.

Чтобы понять, как работают все эти буферы, давайте рассмотрим простой пример. Предположим, что существует Granny, которая хочет подарить своему Grandson 25 Appleс. Это довольно много, поэтому сначала она помещает Apples на большой Plate, на котором можно разместить до 10 яблок одновременно. Когда тарелка ** наполнится, Grandson должен сначала взять хотя бы одно яблоко, и только после этого Granny может положить новое Apple на Plate**:

static final class ChannelBufferStream {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);    ChannelSupplier<Integer> granny = plate.getSupplier();    Promises.loop(0,        apple -> apple < 25,        apple -> plate.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));    granny.streamTo(ChannelConsumer.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));    eventloop.run();  }}

На следующий день Granny хочет дать Appleсвоему Grandson снова, но на этот раз есть только 10 Apples. Таким образом, тарелка не нужна: Granny может просто передавать Apples своему Grandson одно за другим:

static final class ChannelBufferZeroExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();    ChannelSupplier<Integer> granny = buffer.getSupplier();
    Promises.loop(0,        apple -> apple < 10,        apple -> buffer.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));
    granny.streamTo(ChannelConsumer.<Integer>ofConsumer(apple ->        System.out.println("Grandson takes apple #" + apple)).async());
    eventloop.run();  }}

Смотрите этот пример на GitHub.

Пример ChannelSplitter

В этом примере мы используем предопределенные ChannelSplitter. Splitter позволяет разделить данные с одного входа на несколько выходов. В нашем случае вывод будет разделен на три ChannelConsumers:

public class SplitterExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();    List<Integer> integers = Stream.iterate(1, i -> i + 1)        .limit(5)        .collect(Collectors.toList());
    ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSupplier.ofList(integers));
    List<Integer> list1 = new ArrayList<>();    List<Integer> list2 = new ArrayList<>();    List<Integer> list3 = new ArrayList<>();
    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list1::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list2::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list3::add)));
    eventloop.run();
    System.out.println("First list: " + list1);    System.out.println("Second list: " + list2);    System.out.println("Third list: " + list3);  }}

Смотрите этот пример на GitHub.

Пример декодера ByteBufs

ByteBufsDecoder позволяет эффективно работать с ByteBufs и декодировать хранящиеся в них данные для дальнейшей обработки. В данном примере BinaryChannelSupplier предоставит строку, декодированную и разобранную из ByteBuf.

public final class ByteBufsDecoderExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    List<ByteBuf> letters = asList(wrapAscii("H"), wrapAscii("e"), wrapAscii("l"), wrapAscii("l"), wrapAscii("o"));    ByteBufsDecoder<String> decoder = bufs -> {      if (!bufs.hasRemainingBytes(5)) {        System.out.println("Not enough bytes to decode message");        return null;      }      return bufs.takeExactSize(5).asString(UTF_8);    };
    BinaryChannelSupplier.of(ChannelSupplier.ofList(letters)).decode(decoder)        .whenResult(x -> System.out.println(x));
    eventloop.run();  }}

Пример Channel File

Этот пример демонстрирует, как работать с файлами асинхронно, используя Promises и встроенные потребители и поставщики CSP. В этом примере в файл записываются две строки с ChannelFileWriter, затем считывает и выводит их, используя ChannelFileReader

private static @NotNull Promise<Void> writeToFile() {  return ChannelSupplier.of(      ByteBufStrings.wrapAscii("Hello, this is example file\n"),      ByteBufStrings.wrapAscii("This is the second line of file\n"))      .streamTo(ChannelFileWriter.open(executor, PATH, WRITE));}
private static @NotNull Promise<Void> readFile() {  return ChannelFileReader.open(executor, PATH)      .then(cfr -> cfr.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));
}

Если вы запустите пример, вы увидите содержимое созданного файла:

Hello, this is example fileThis is the second line of file

Полный пример смотрите на GitHub.