跳到主要内容

承诺

概述

许诺是ActiveJ异步编程模型的主要构件,可以与Java Futures相比较。 Promise 代表一个尚未完成的操作的结果。

特点

  • 与Java Futures不同, Promises ,被设计为在一个单一的事件循环线程中工作。
  • 承诺 是非常轻的
  • 没有多线程的开销
  • 每秒可处理数百万次呼叫
  • 用于组合多个承诺的强大的API

创造诺言

我们主要可以用基本的静态方法创建 Promise

  • of(T value) - 创建一个成功完成的承诺,如 CompletableFuture.completedFuture().
  • ofException(Exception e) - 创建一个异常完成的承诺。
  • complete() - 创建一个成功完成的 Promise<Void>,是 Promise.of(null)的快捷方式。
Promise<Integer> firstNumber = Promise.of(10);Promise.of("Hello World");Promise.ofException(new Exception("Something went wrong") )。

链式承诺

Promise将在某个未指定的时间成功或失败,你需要将在这两种情况下执行的方法连锁起来。

  • then(FunctionEx<T, Promise<U>>) - 返回一个新的Promise,该Promise是通过将原Promise的结果映射到另一个Promise而得到的。 如果原来的承诺被例外地完成了,将不应用映射功能。 类似于Java的 CompletionStage.thenCompose()。 用于将一个承诺的结果映射到另一个承诺。
  • then(SupplierEx<>) - 返回一个新的Promise,该Promise是通过调用一个提供的供应商的新Promise获得的。 如果最初的承诺被例外地完成,就不会有供应商被调用。 如果你可以忽略原始承诺的结果,请使用这些方法。
  • map(FunctionEx<T, U>) - 返回一个新的Promise,该Promise是通过将原始promise的结果映射到一些其他的值而得到。 如果原来的承诺被例外地完成了,将不应用映射功能。 类似于Java的 CompletionStage.thenApply()。 用于将一个承诺的结果映射到其他的值。
  • whenResult(ConsumerEx<T>) - 订阅给定的消费者,在原始Promise成功完成后执行。 类似于Java的 CompletionStage.thenAccept()

此外,为了处理错误,还提供了以下方法。

  • then(BiFunctionEx ) - 返回一个新的Promise,该Promise是通过将原始承诺的一个结果和一个异常映射到其他承诺而得到的。<T, Exception, Promise<U>> 如果最初的承诺被例外地完成,传递给映射bi函数的异常被保证为非空。 接受2个参数:一个原始承诺的结果和一个代表失败承诺的异常。
  • map(BiFunctionEx<T, Exception, U>) - 返回一个新的Promise,该Promise是通过将原始promise的一个结果和一个异常映射到其他值而得到的 。 如果这个承诺例外地完成了,那么传递给映射bi函数的异常就会被保证为非空。 接受2个参数:一个原始承诺的结果和一个代表失败承诺的异常。
  • whenException(ConsumerEx<Exception>) - 订阅给定消费者的expetion,在原始Promise完成后例外地执行。
  • whenException(RunnableEx) - 订阅给定的Runnable,在原始Promise完成后例外地执行。
  • whenComplete(BiConsumerEx<T, Exception>) - 订阅给定的生物消费者,以便在原始Promise完成后执行 (无论是成功还是例外)。 接受2个参数:一个原始承诺的结果和一个代表失败承诺的异常。
  • whenComplete(RunnableEx) - 订阅给定的runnable,以便在原始的Promise完成后执行 (无论是成功还是例外)。
note

每个承诺链方法都需要一个'Ex'函数作为参数。 这些是功能接口,类似于 java.util.function 包中的对应接口。 主要区别在于,每个'Ex'函数都可能抛出一个 Exception

如果一个 Ex 函数抛出一个被检查的异常,那么产生的承诺也会被例外地完成。 如果一个未检查的异常被抛出 ,而不是被抛出,一个承诺将不会被完成,未检查的异常将被重新抛出。

如果有多个异步调用,我们需要按顺序执行它们。 在这种情况下,你可以简单地将方法链 ,以创建一个序列。

doSomeProcess()    .whenResult(result -> System.out.printf("Result of some process is '%s'%n", result))    .whenException(e -> System.out.printf("Exception after some process is '%s'%n", e.getMessage()))    .map(s -> s.toLowerCase())    .map(result -> String.format("The mapped result is '%s'", result), e -> e.getMessage())    .whenResult(s -> System.out.println(s));

在GitHub上看到完整的例子

合并承诺

有些情况下,你需要执行几个 Promise,并将其结果结合起来。 为此, ,考虑到以下静态方法,从 承诺 类。

  • combine() - 返回一个新的 Promise ,当两个 Promises都完成后,以两个结果为参数执行。
  • all() - 返回一个 Promise ,当所有提供的承诺都完成时,就完成了。
  • any() - 返回第一个完成的 诺言之一
Promise<Integer> firstNumber = Promise.of(10);Promise<Integer> secondNumber = Promises.delay(2000, 100);
Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);result.whenResult(res -> System.out.println("The first result is " + res));
  • delay() - 延迟完成所提供的 Promise ,并定义一个时间段。
Promise<String> strPromise = Promises.delay("result", Duration.seconds(10))

优化功能

ActiveJ Promise 在很大程度上进行了GC优化。

  • 一个典型的 Promise 的内部表示由1-2个对象组成, 里面的字段最少。
  • Promise ,它将结果传递给它的订阅者,然后丢弃结果。

为了优化 Promises,有几个 Promise 接口的实现。

graph TD Promise --> AbstractPromise Promise --> CompleteExceptionallyPromise Promise --> CompletePromise AbstractPromise --> NextPromise AbstractPromise --> SetablePromise CompletePromise --> CompleteResultPromise CompletePromise --> CompleteNullPromise
  • Promise - 根接口,表示 承诺 行为。
  • SettablePromise - 可作为 Promises链的根。 允许在 Promises中包裹操作, ,甚至可以在实际完成之前手动完成。
  • AbstractPromise, NextPromise - 帮助类,用于创建无状态 Promises的链。 你可以把 ,这些链就像管道一样,把值传过去,但不存储。
  • CompletePromise - 一个抽象的类,表示一个成功完成的 Promise
  • CompleteExceptionallyPromise, CompleteResultPromise, CompleteNullPromise - 帮助类。

基准

我们将 ActiveJ PromiseJava CompletableFuture 在不同的场景中进行了比较。

  1. ActiveJ Promise/Java CompletableFuture 用一个承诺/未来执行操作。
  2. ActiveJ Promise/Java CompletableFuture 结合了几个承诺/期货。

我们使用JMH作为基准工具,在 AverageTime 模式下进行基准测试。 所有的测量都是以纳秒为单位。

ActiveJ Promise oneCallMeasureCnt: 10; Score: 12.952; Error: ± 0.693; Units: ns/op;
ActiveJ Promise combineMeasureCnt: 10; Score: 34.112; Error: ± 1.869; 单位: ns/op;
Java CompletableFuture oneCallMeasureCnt: 10; Score: 85.151; Error: ± 1.781; 单位: ns/op;
Java CompletableFuture combineMeasureCnt: 10; Score: 153.645; Error: ± 4.491; 单位: ns/op;

你可以在以下网站找到基准来源 GitHub

实例

note

要运行例子,你需要从GitHub克隆ActiveJ

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查看标签 v5.4.3。 在运行这些例子之前,先建立项目。 这些例子位于 activej/examples/core/promise。

承诺链示例

你可以创建 Promise的链,甚至在它们完成之前,你还不知道它们是成功完成 还是出现异常。 在这个例子中,我们有一个 doSomeProcess ,它返回一个 Promise ,有相等的 机会成功完成或出现异常。 因此,我们创建一个链,它将处理这两种情况。

@SuppressWarnings("Convert2MethodRef")public class PromiseChainExample {  private static final Eventloop eventloop = Eventloop.create().withCurrentThread();
  public static void main(String[] args) {    //[START REGION_1]    doSomeProcess()        .whenResult(result -> System.out.printf("Result of some process is '%s'%n", result))        .whenException(e -> System.out.printf("Exception after some process is '%s'%n", e.getMessage()))        .map(s -> s.toLowerCase())        .map(result -> String.format("The mapped result is '%s'", result), e -> e.getMessage())        .whenResult(s -> System.out.println(s));    //[END REGION_1]    Promise.complete()        .then(PromiseChainExample::loadData)        .whenResult(result -> System.out.printf("Loaded data is '%s'%n", result));    eventloop.run();  }
  private static Promise<String> loadData() {    return Promise.of("Hello World");  }
  public static Promise<String> doSomeProcess() {    return Promises.delay(1000, Math.random() > 0.5 ?        Promise.of("Hello World") :        Promise.ofException(new RuntimeException("Something went wrong")));  }}

如果你运行这个例子,你会收到这样的输出(如果 doSomeProcess 成功完成)。

加载的数据是'Hello World'某个进程的结果是'Hello World'映射的结果是'Hello World' 。

或者这样,如果它以异常结束。

加载的数据是'Hello World'某个过程后出现的异常是'出错了'出错了

请注意,第一行是

载入的数据是 "Hello World"。

这是由于我们在 doSomeProcess中设置了1秒的延迟。

在GitHub上看到完整的例子

承诺高级示例

你可以将几个 Promises,例如。

Promise<Integer> firstNumber = Promise.of(10);Promise<Integer> secondNumber = Promises.delay(2000, 100);
Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);result.whenResult(res -> System.out.println("The first result is " + res));

也有几种方法可以延迟 承诺

int someValue = 1000;int delay = 1000;     // in millisecondsint interval = 2000;  // also in millisecondsPromise<Integer> intervalPromise = Promises.interval(interval, Promise.of(someValue));Promise<Integer> schedulePromise = Promises.schedule(someValue * 2, Instant.now());Promise<Integer> delayPromise = Promises.delay(delay, someValue);
Promise<Integer> result = intervalPromise    .combine(schedulePromise, (first, second) -> first - second)    .combine(delayPromise, Integer::sum);
result.whenResult(res -> System.out.println("The second result is " + res));

在GitHub上看到完整的例子

承诺实例

Promises 是一个辅助类,可以有效地管理多个 Promise。 这个例子将展示三个用例。

  • 在下面的例子中,我们使用了 Promises loop,它类似于Java for loop,但具有异步功能, ,它由 Promise提供。
Promises.loop(0,    i -> i < 5,    i -> {      System.out.println("This is iteration #" + i);      return Promise.of(i + 1);    });

输出是。

带条件的循环:这是迭代#1这是迭代#2这是迭代#3这是迭代#4这是迭代#5

2.另一个例子使用 Promises toList 方法创建一个 Promises结果的列表。

Promises.toList(Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))    .whenResult(list -> System.out.println("Size of collected list: " + list.size() + "\nList: " + list));

输出是。

将**承诺**组收集到**承诺**的列表中,结果:收集的列表大小。6列表。[1, 2, 3, 4, 5, 6]

3.在最后一个例子中 Promises toArray 方法被利用,它将 promises 减少为所提供的数据类型的数组(在这个例子中, Integers)。

Promises.toArray(Integer.class, Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))    .whenResult(array -> System.out.println("Size of collected array: " + array.length + "\nArray: " + Arrays.toString(array)));

而最终的输出是。

将**承诺**组收集到**承诺**的数组中,结果:收集的数组大小。6阵列。[1, 2, 3, 4, 5, 6]

在GitHub上看到完整的例子

异步文件服务示例

另外,你可以使用 Promises ,与文件系统一起工作。 当你运行这个例子时。

private static @NotNull Promise<Void> writeToFile() {  try {    FileChannel channel = FileChannel.open(PATH, setOf(WRITE, APPEND));
    byte[] message1 = "Hello\n".getBytes();    byte[] message2 = "This is test file\n".getBytes();    byte[] message3 = "This is the 3rd line in file".getBytes();
    return fileService.write(channel, 0, message1, 0, message1.length)        .then(() -> fileService.write(channel, 0, message2, 0, message2.length))        .then(() -> fileService.write(channel, 0, message3, 0, message3.length))        .toVoid();  } catch (IOException e) {    return Promise.ofException(e);  }}
private static @NotNull Promise<ByteBuf> readFromFile() {  byte[] array = new byte[1024];  FileChannel channel;  try {    channel = FileChannel.open(PATH, setOf(READ));  } catch (IOException e) {    return Promise.ofException(e);  }
  return fileService.read(channel, 0, array, 0, array.length)      .map(bytesRead -> {        ByteBuf buf = ByteBuf.wrap(array, 0, bytesRead);        System.out.println(buf.getString(UTF_8));        return buf;      });}

......你会收到以下输出,它代表了所创建的文件的内容。

你好这是测试文件这是文件中的第三行

在GitHub上看到完整的例子