Promise
Overview
Promises are the primary building blocks in the ActiveJ asynchronous programming model and can be compared to Java Futures.
A Promise
represents the result of an operation that is not yet complete,
Features
- Unlike Java Futures, Promises were originally designed to work within a single reactor context (thread)
- Promises are lean and lightweight
- No multi-threading overhead
- Can handle millions of calls per second
- Rich API for combining multiple promises
Creating Promises
We can primarily create Promise
s with the basic static methods:
of(T value)
- creates a successfully completed promise, likeCompletableFuture.completedFuture()
.ofException(Exception e)
- creates an exceptionally completed promise.complete()
- creates a successfully completedPromise<Void>
, a shortcut toPromise.of(null)
.
Promise<Integer> firstNumber = Promise.of(10);
Promise<String> helloWorld = Promise.of("Hello World");
Promise<Object> exceptionalPromise = Promise.ofException(new Exception("Something went wrong"));
Promise<Void> completePromise = Promise.complete();
Chaining Promises
A promise will succeed or fail at some unspecified time, and you need to chain methods that will be executed in both cases:
then(AsyncFunctionEx<T, U>)
- returns a new Promise which is obtained by mapping the result of the original promise to some other promise. If the original promise is completed exceptionally, the mapping function will not be applied. Similar toCompletionStage.thenCompose()
in Java. It is used to map the result of a promise to another promise.then(AsyncSupplierEx<U>)
- returns a new Promise, which is obtained by calling the provided supplier of the new promise. If the original promise is completed exceptionally, the supplier will not be called. Use this method if you can ignore the result of the original promise.map(FunctionEx<T, U>)
- returns a new Promise obtained by mapping the result of the original promise to some other value. If the original promise is completed exceptionally, the mapping function will not be applied. Similar toCompletionStage.thenApply()
in Java. It is used to map the result of a promise to some other value.whenResult(ConsumerEx<T>)
- subscribes this consumer to be executed after successful completion of the original Promise. Similar toCompletionStage.thenAccept()
in Java.whenResult(RunnableEx)
- subscribes this consumer to be executed after successful completion of the original Promise. Similar toCompletionStage.thenAccept()
in Java. Use this method if you can ignore the result of the original promise.
In addition, the following methods are provided for error handling:
then(AsyncBiFunctionEx<T, Exception, U>)
- returns a new Promise, which is obtained by mapping the result and exception of the original promise to some other promise. If the original promise is completed exceptionally, the exception passed to a mapping bi function is guaranteed not to be null. Bi function takes 2 arguments: the result of the original promise and the exception representing the failed promise.then(AsyncFunctionEx<T, U>, AsyncFunctionEx<Exception, U>)
- returns a new Promise, which is obtained by mapping the result or exception of the original promise to some other promise. If the original promise is completed succesfully, the first function will be called. Otherwise, the second function will be called with a non-null exception.map(BiFunctionEx<T, Exception, U>)
- returns a new Promise, obtained by mapping the result and exception of the original promise to some other value. If this promise is completed exceptionally, the exception passed to a mapping bi function is guaranteed not to be null. It takes 2 arguments: the result of original promise and the exception representing the failed promise.map(FunctionEx<T, U>, FunctionEx<Exception, U>)
- returns a new Promise, obtained by mapping the result or exception of the original promise to some other value. If the original promise is completed succesfully, the first function will be called. Otherwise, the second function will be called with a non-null exception.whenException(ConsumerEx<Exception>)
- subscribes this exception consumer to be executed after the exceptional completion of the original Promise.whenException(Class<Exception>, ConsumerEx<Exception>)
- subscribes this exception consumer to be executed after the exceptional completion of the original Promise if the exception matches a class passed as a first argument or is a subclass of this class.whenException(RunnableEx)
- subscribes this runnable to be executed after the exceptional completion of the original Promise.whenException(Class<Exception>, RunnableEx)
- subscribes this runnable to be executed after the exceptional completion of the original Promise if the exception matches a class passed as a first argument or is a subclass of this class.whenComplete(BiConsumerEx<T, Exception>)
- subscribes this bi consumer to be executed after the completion (either successful or exceptional) of the original Promise. It takes 2 arguments: the result of the original promise and the exception representing the failed promise.whenComplete(ConsumerEx<T>, ConsumerEx<Exception>)
- subscribes one of this consumers to be executed after the completion (either successful or exceptional) of the original Promise. If the original promise is completed succesfully, the first consumer will be called. Otherwise, the second consumer will be called with a non-null exception.whenComplete(RunnableEx)
- subscribes this runnable to be executed after the completion (either successful or exceptional) of the original Promise.
Each promise chain method takes an 'Ex' function as its argument. These are functional interfaces similar to their counterparts in the
java.util.function
package. The main difference is that each 'Ex' function can throw an Exception
.
If the Ex
function throws a checked exception, the resulting promise will also be completed exceptionally. If an unchecked exception is thrown
instead, the promise would not be completed and the unchecked exception would be rethrown as-is.
Additionaly, then
chainging methods take asynchronous versions of such functions. Those methods act as regular functions but return Promise<T>
instead of T
.
If there are several asynchronous calls, we need to execute them in order. In this case you can simply chain the methods to create a sequence.
doSomeProcess()
.whenResult(result -> System.out.printf("Result of some process is '%s'%n", result))
.whenException(e -> System.out.printf("Exception after some process is '%s'%n", e.getMessage()))
.map(s -> s.toLowerCase())
.map(result -> String.format("The mapped result is '%s'", result), e -> e.getMessage())
.whenResult(s -> System.out.println(s));
Combine Promises
There are cases when it is necessary to execute several Promise
s and combine their results. To do this,
consider the following static methods from the Promises class:
combine()
- returns a newPromise
which, when bothPromise
s are completed, is executed with two results as arguments.all()
- returns aPromise
that completes when all given promises are completed.any()
- returns one of the first completedPromises
.
Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);
Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));
delay()
- delays completion of thePromise
for a certain period of time.
Promise<String> strPromise = Promises.delay("result", Duration.seconds(10))
Optimization Features
ActiveJ Promise
is heavily GC-optimized:
- The internal representation of a typical
Promise
consists of 1-2 objects with a bare minimum number of fields inside - After completion,
Promise
passes the result to its subscribers and than discards it
To optimize Promise
s, there are several implementations of the Promise
interface:
Promise
- the root interface that represents the behaviour of promises.SettablePromise
- can be used as the root for a chain ofPromise
s. Allows you to wrap operations inPromise
s, can be completed manually even before actual completion.AbstractPromise
,NextPromise
- helper classes for creating chains of stateless Promises. You can treat these chains as pipes that pass values but do not store them.CompletePromise
- an abstract class that represents a successfully completedPromise
.CompleteExceptionallyPromise
,CompleteResultPromise
,CompleteNullPromise
- helper classes.
Benchmarks
We've compared ActiveJ Promise with Java CompletableFuture in various scenarios:
- ActiveJ Promise/Java CompletableFuture executes operations with one promise/future.
- ActiveJ Promise/Java CompletableFuture combines several promises/futures.
We used JMH as the benchmark tool and ran the benchmarks in AverageTime mode. All measurements are presented in nanoseconds.
ActiveJ Promise oneCallMeasure
Cnt: 10; Score: 12.952; Error: ± 0.693; Units: ns/op;
ActiveJ Promise combineMeasure
Cnt: 10; Score: 34.112; Error: ± 1.869; Units: ns/op;
Java CompletableFuture oneCallMeasure
Cnt: 10; Score: 85.151; Error: ± 1.781; Units: ns/op;
Java CompletableFuture combineMeasure
Cnt: 10; Score: 153.645; Error: ± 4.491; Units: ns/op;
You can find benchmark sources on GitHub
Examples
To run the examples, you need to clone ActiveJ from GitHub
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project.
These examples are located at activej/examples/core/promise
PromiseChainExample
You can create chains of Promise
s even before they are completed and you do not yet know whether they will succeed or fail.
In this example we have a doSomeProcess that returns a Promise
that has an equal
chance of succeeding or failing. So we create a chain that handles both cases:
@SuppressWarnings("Convert2MethodRef")
public class PromiseChainExample {
private static final Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
public static void main(String[] args) {
//[START REGION_1]
doSomeProcess()
.whenResult(result -> System.out.printf("Result of some process is '%s'%n", result))
.whenException(e -> System.out.printf("Exception after some process is '%s'%n", e.getMessage()))
.map(s -> s.toLowerCase())
.map(result -> String.format("The mapped result is '%s'", result), e -> e.getMessage())
.whenResult(s -> System.out.println(s));
//[END REGION_1]
Promise.complete()
.then(PromiseChainExample::loadData)
.whenResult(result -> System.out.printf("Loaded data is '%s'%n", result));
eventloop.run();
}
private static Promise<String> loadData() {
return Promise.of("Hello World");
}
public static Promise<String> doSomeProcess() {
return Promises.delay(1000, Math.random() > 0.5 ?
Promise.of("Hello World") :
Promise.ofException(new RuntimeException("Something went wrong")));
}
}
If you run the example, you will receive either this output (if doSomeProcess finishes successfully):
Loaded data is 'Hello World'
Result of some process is 'Hello World'
The mapped result is 'hello world'
Or this, if it finishes with an exception:
Loaded data is 'Hello World'
Exception after some process is 'Something went wrong'
Something went wrong
Note that the first line is
Loaded data is 'Hello World'
This is due to the 1 second delay we set up in doSomeProcess.
PromiseAdvancedExample
You can combine several Promise
s, for example:
Promise<Integer> firstNumber = Promise.of(10);
Promise<Integer> secondNumber = Promises.delay(2000, 100);
Promise<Integer> result = firstNumber.combine(secondNumber, Integer::sum);
result.whenResult(res -> System.out.println("The first result is " + res));
There are also several ways to delay a Promise
:
int someValue = 1000;
int delay = 1000; // in milliseconds
int interval = 2000; // also in milliseconds
Promise<Integer> intervalPromise = Promises.interval(interval, Promise.of(someValue));
Promise<Integer> schedulePromise = Promises.schedule(Instant.now(), someValue * 2);
Promise<Integer> delayPromise = Promises.delay(delay, someValue);
Promise<Integer> result = intervalPromise
.combine(schedulePromise, (first, second) -> first - second)
.combine(delayPromise, Integer::sum);
result.whenResult(res -> System.out.println("The second result is " + res));
PromisesExamples
Promises
is a helper class which allows you to efficiently manage multiple Promise
s. This example will demonstrate three use cases.
- In the following example we use the
Promises
loop
, which is similar to the Java'sfor
loop, but has asynchronous capabilities, thatPromise
provides:
Promises.loop(0,
i -> i < 5,
i -> {
System.out.println("This is iteration #" + i);
return Promise.of(i + 1);
});
The output is:
Looping with condition:
This is iteration #1
This is iteration #2
This is iteration #3
This is iteration #4
This is iteration #5
2.Another example creates a list of Promise
s results using Promises
toList
method:
Promises.toList(Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
.whenResult(list -> System.out.println("Size of collected list: " + list.size() + "\nList: " + list));
The output is:
Collecting group of **Promises** to list of **Promises**' results:
Size of collected list: 6
List: [1, 2, 3, 4, 5, 6]
3.The last example uses the Promises toArray method, which reduces promises to an array of data of a given type (in this case, Integers):
Promises.toArray(Integer.class, Promise.of(1), Promise.of(2), Promise.of(3), Promise.of(4), Promise.of(5), Promise.of(6))
.whenResult(array -> System.out.println("Size of collected array: " + array.length + "\nArray: " + Arrays.toString(array)));
And the final output is:
Collecting group of **Promises** to array of **Promises**' results:
Size of collected array: 6
Array: [1, 2, 3, 4, 5, 6]
AsyncFileServiceExample
You can also use Promises
to work with the file system. When you run this example:
private static Promise<Void> writeToFile() {
try {
FileChannel channel = FileChannel.open(PATH, Set.of(WRITE, APPEND));
byte[] message1 = "Hello\n".getBytes();
byte[] message2 = "This is test file\n".getBytes();
byte[] message3 = "This is the 3rd line in file".getBytes();
return FILE_SERVICE.write(channel, 0, message1, 0, message1.length)
.then(() -> FILE_SERVICE.write(channel, 0, message2, 0, message2.length))
.then(() -> FILE_SERVICE.write(channel, 0, message3, 0, message3.length))
.toVoid();
} catch (IOException e) {
return Promise.ofException(e);
}
}
private static Promise<ByteBuf> readFromFile() {
byte[] array = new byte[1024];
FileChannel channel;
try {
channel = FileChannel.open(PATH, Set.of(READ));
} catch (IOException e) {
return Promise.ofException(e);
}
return FILE_SERVICE.read(channel, 0, array, 0, array.length)
.map(bytesRead -> {
ByteBuf buf = ByteBuf.wrap(array, 0, bytesRead);
System.out.println(buf.getString(UTF_8));
return buf;
});
}
... you will get the following output, which represents the contents of the created file:
Hello
This is test file
This is the 3rd line in file