跳到主要内容

CSP

概述

CSP(代表通信顺序过程)在异步数据供应商和消费者之间提供顺序的I/O通信。 它用于数据的异步流。 CSP的灵感来自Go语言通道。

特点。

  • 高性能和高吞吐速度
  • 为处理中等大小的对象进行了优化(如 ByteBufs)。
  • CSP已经达到DSL,它提供了一个简单的编程模型
  • 有一个异步的背压管理

渠道供应商和渠道消费者

CSP的通信是通过以下方式进行的 渠道供应商渠道消费者"。,它们分别提供和接受一些数据。 对这些通道的每个连续请求都应该在前一个请求结束后才调用。 CSP使用 承诺 来管理。

ChannelSupplier 有一个 get() 方法,返回一个 Promise ,提供的值。 在这个 Promise 完成之前,无论是有结果还是有异常, get() 方法都不应该被再次调用。 还要注意,如果 get() 返回 Promise of null,这代表流的结束,不应要求该供应商提供其他数据。

ChannelConsumer 有一个 accept(@Nullable T value) 方法,返回一个 Promise of null 作为接受完成的标志。 在这个 Promise 完成之前, accept() 方法不应该被再次调用。 通过与 ChannelSupplier的类比,如果接受一个 null 的值,它就代表了流的结束。

下面是 ChannelSupplierChannelConsumer之间的通信实例。

protected void doProcess() {  input.get()      .whenResult(data -> {        if (data == null) {          output.acceptEndOfStream()              .whenResult(this::completeProcess);        } else {          data = data.toUpperCase() + '(' + data.length() + ')';
          output.accept(data)              .whenResult(this::doProcess);        }      })      .whenException(Throwable::printStackTrace);}

通道队列

CSP的另一个重要概念是 通道队列 接口和它的实现。 通道缓冲区通道零缓冲区(ChannelZeroBuffer. 它们在 消费者供应商 之间提供通信,并允许他们在需要时创建这些管道的链子。 基本上,这些缓冲区在队列获得空闲空间时,将对象从 ChannelConsumer 传递到 ChannelSupplier。 这一过程由 Promises控制。 你可以手动设置 ChannelBuffer的大小ChannelZeroBuffer 不存储任何数值,只是将它们从 ChannelConsumerChannelSupplier逐一传递。 下面是一个处理项目缓冲区的简单例子。

public void accept(T item) {    buffer.add(item);    if (buffer.isSaturated() ) {        getSupplier().suspend();    }}
void produce() {    while (!buffer.isEmpty() ) {        T item = buffer.poll();        if (item != null) {            send(item);        } else {            sendEndOfStream();        }    }}

与Datastream的比较

CSP与 Datastream 模块有很多共同之处。 虽然它们都是为I/O处理而设计的,但有几个重要的区别。

数据流CSP
俯视。极低:流可以用1个虚拟调用启动,短路评估优化了性能没有短路评估,开销较大
吞吐速度。速度极快 (360 880 548 ops/sec)快 (105 932 203 ops/sec),但比Datastream慢。
编程模式。更加复杂简单而方便

为了提供最大的效率,ActiveJ广泛利用了CSP和Datastream的组合。 为此, ChannelSupplier, ChannelConsumer流动供应商"。流水线消费者"(StreamConsumer)。transformWith() 方法和特殊的 Transformer接口。 使用这些方法和接口,你可以将通道无缝地转换为其他通道或数据流,反之亦然,创建这种转换的链。

基准

我们已经测量了CSP的性能(ChannelSupplier 流50M Integer 对象到 ChannelConsumer 场景),并得到以下结果。

时间:4720ms;平均时间。472.0ms;最佳时间。469ms;最差的时间。475ms; 每秒操作数: 105 932 203

我们还测量了同时使用CSP和Datastream的TCP服务器性能,得到的平均结果是: 47 495 905次/秒

实例

note

要运行这些例子,你需要从GitHub上克隆ActiveJ。

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查看标签 v5.4.3。 在运行这些例子之前,先建立项目。 这些例子位于 activej/examples/core/csp

基本通道实例

频道示例 展示了供应商和消费者之间的互动,使用 串流 和一些助手方法:

private static void supplierOfValues() {  ChannelSupplier.of("1", "2", "3", "4", "5")      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void supplierOfList(List<String> list) {  ChannelSupplier.ofList(list)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void map() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .map(integer -> integer + " times 10 = " + integer * 10)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}
private static void toCollector() {  ChannelSupplier.of(1, 2, 3, 4, 5)      .toCollector(Collectors.toList())      .whenResult(x -> System.out.println(x));}
private static void filter() {  ChannelSupplier.of(1, 2, 3, 4, 5, 6)      .filter(integer -> integer % 2 == 0)      .streamTo(ChannelConsumer.ofConsumer(System.out::println));}

因此,如果你运行这个例子,你会收到以下输出。

12345一个两个三个1次10 = 102次10 = 203次10 = 304次10 = 405次10 = 50[1, 2, 3, 4, 5]246

在GitHub上看到完整的例子

CSP实例

这个例子代表了一个 AsyncProcess ChannelSupplierChannelConsumer之间。 在这个示例 通道供应商 表示输入 通道消费者 - 输出:

public final class CspExample extends AbstractCommunicatingProcess implements WithChannelTransformer<CspExample, String, String> {  private ChannelSupplier<String> input;  private ChannelConsumer<String> output;
  @Override  public ChannelOutput<String> getOutput() {    return output -> {      this.output = output;      if (this.input != null && this.output != null) startProcess();    };  }
  @Override  public ChannelInput<String> getInput() {    return input -> {      this.input = input;      if (this.input != null && this.output != null) startProcess();      return getProcessCompletion();    };  }
  @Override  //[START REGION_1]  protected void doProcess() {    input.get()        .whenResult(data -> {          if (data == null) {            output.acceptEndOfStream()                .whenResult(this::completeProcess);          } else {            data = data.toUpperCase() + '(' + data.length() + ')';
            output.accept(data)                .whenResult(this::doProcess);          }        })        .whenException(Throwable::printStackTrace);  }  //[END REGION_1]
  @Override  protected void doClose(Exception e) {    System.out.println("Process has been closed with exception: " + e);    input.closeEx(e);    output.closeEx(e);  }
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    CspExample process = new CspExample();    ChannelSupplier.of("hello", "world", "nice", "to", "see", "you")        .transformWith(process)        .streamTo(ChannelConsumer.ofConsumer(System.out::println));
    eventloop.run();  }}

这个过程接收一个字符串,将其设置为大写字母,并在括号内加上字符串的长度。

hello(5)world(5)nice(4)to(2)see(3)you(3)

在 GitHub 上查看这个示例

通道缓冲器实例

正如之前提到的 ,有两个 ChannelQueue 实现: ChannelBufferChannelZeroBuffer, ,它们都是管理提供者和供应商之间的通信。 你可以手动设置 ChannelBuffer的大小,而 ChannelZeroBuffer 的大小总是0。

为了理解所有这些缓冲区是如何工作的,让我们举一个简单的例子。 假设有一个 奶奶 ,想给她的 孙子 25个 苹果s。 这是相当多的,所以她首先把 苹果s放在一个大 ,可以同时放置10个苹果。 当 盘子孙子 ,应该先拿至少一个苹果,之后 奶奶 ,才能把一个新的 苹果盘子

static final class ChannelBufferStream {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelBuffer<Integer> plate = new ChannelBuffer<>(5, 10);    ChannelSupplier<Integer> granny = plate.getSupplier();    Promises.loop(0,        apple -> apple < 25,        apple -> plate.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));    granny.streamTo(ChannelConsumer.ofConsumer(apple -> System.out.println("Grandson takes apple #" + apple)));    eventloop.run();  }}

第二天, 奶奶 ,想再把 苹果s给她的孙子 ,但这次只有10个 苹果。 因此,并不真正需要盘子: 奶奶 ,可以简单地将 苹果 ,逐一传递给她的 孙子

static final class ChannelBufferZeroExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    ChannelQueue<Integer> buffer = new ChannelZeroBuffer<>();    ChannelSupplier<Integer> granny = buffer.getSupplier();
    Promises.loop(0,        apple -> apple < 10,        apple -> buffer.put(apple).map($ -> {          System.out.println("Granny gives apple   #" + apple);          return apple + 1;        }));
    granny.streamTo(ChannelConsumer.<Integer>ofConsumer(apple ->        System.out.println("Grandson takes apple #" + apple)).async());
    eventloop.run();  }}

在GitHub上看到这个例子

通道分配器实例

在这个例子中,我们使用预定义的 频道分割器. 分割器允许将数据从一个输入分割到几个输出。 在我们的案例中,输出将被分割成三个 ChannelConsumers

public class SplitterExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();    List<Integer> integers = Stream.iterate(1, i -> i + 1)        .limit(5)        .collect(Collectors.toList());
    ChannelSplitter<Integer> splitter = ChannelSplitter.create(ChannelSupplier.ofList(integers));
    List<Integer> list1 = new ArrayList<>();    List<Integer> list2 = new ArrayList<>();    List<Integer> list3 = new ArrayList<>();
    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list1::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list2::add)));    splitter.addOutput().set(ChannelConsumer.of(AsyncConsumer.of(list3::add)));
    eventloop.run();
    System.out.println("First list: " + list1);    System.out.println("Second list: " + list2);    System.out.println("Third list: " + list3);  }}

在GitHub上看到这个例子

ByteBufs解码器实例

ByteBufsDecoder 允许有效地处理 ByteBufs 并对存储在其中的数据进行解码以进一步处理。 在这个例子中 二进制通道供应商(BinaryChannelSupplier 将提供一个从ByteBuf解码和解析的String。

public final class ByteBufsDecoderExample {  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withCurrentThread();
    List<ByteBuf> letters = asList(wrapAscii("H"), wrapAscii("e"), wrapAscii("l"), wrapAscii("l"), wrapAscii("o"));    ByteBufsDecoder<String> decoder = bufs -> {      if (!bufs.hasRemainingBytes(5)) {        System.out.println("Not enough bytes to decode message");        return null;      }      return bufs.takeExactSize(5).asString(UTF_8);    };
    BinaryChannelSupplier.of(ChannelSupplier.ofList(letters)).decode(decoder)        .whenResult(x -> System.out.println(x));
    eventloop.run();  }}

通道文件实例

这个例子演示了如何使用Promises和CSP内置的消费者和供应者来异步处理文件。 这个例子在文件中写了两行,内容是 ChannelFileWriter,然后读取并打印出它们的利用。 频道文件读取器

private static @NotNull Promise<Void> writeToFile() {  return ChannelSupplier.of(      ByteBufStrings.wrapAscii("Hello, this is example file\n"),      ByteBufStrings.wrapAscii("This is the second line of file\n"))      .streamTo(ChannelFileWriter.open(executor, PATH, WRITE));}
private static @NotNull Promise<Void> readFile() {  return ChannelFileReader.open(executor, PATH)      .then(cfr -> cfr.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.print(buf.asString(UTF_8)))));
}

如果你运行这个例子,你会看到创建的文件的内容。

你好,这是示例文件,这是文件的第二行

在GitHub上看到完整的例子