Basic Examples
To run the examples, you need to clone ActiveJ from GitHub
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. Simple RPC Example are located at activej/examples/cloud/rpc
RPC strategies examples are located at activej/cloud-rpc/src/test/RpcStrategiesTest
Simple RPC Example
In the "Hello World" client and server RPC Example, the client sends a request to the server containing the word "World"
. When the
server receives it, it sends a response containing the word "Hello "
. If everything succeeds, we get the
following output:
Got result: Hello World
Let's take a look at the implementation:
public class RpcExample extends Launcher {
private static final int SERVICE_PORT = 34765;
@Inject
private IRpcClient client;
@Inject
private RpcServer server;
@Inject
private Reactor reactor;
@Provides
NioReactor reactor() {
return Eventloop.create();
}
@Provides
RpcServer rpcServer(NioReactor reactor) {
return RpcServer.builder(reactor)
.withMessageTypes(String.class)
.withHandler(String.class,
request -> Promise.of("Hello " + request))
.withListenPort(SERVICE_PORT)
.build();
}
@Provides
IRpcClient rpcClient(NioReactor reactor) {
return RpcClient.builder(reactor)
.withMessageTypes(String.class)
.withStrategy(server(new InetSocketAddress(SERVICE_PORT)))
.build();
}
@ProvidesIntoSet
Initializer<ServiceGraphModuleSettings> configureServiceGraph() {
// add logical dependency so that service graph starts client only after it started the server
return settings -> settings.withDependency(Key.of(IRpcClient.class), Key.of(RpcServer.class));
}
@Override
protected Module getModule() {
return ServiceGraphModule.create();
}
@Override
protected void run() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future = reactor.submit(() ->
client.sendRequest("World", 1000)
);
System.out.printf("%nRPC result: %s %n%n", future.get());
}
public static void main(String[] args) throws Exception {
RpcExample example = new RpcExample();
example.launch(args);
}
}
The RpcExample
class extends ActiveJ Launcher
to help us manage the lifecycle of the application.
Next, we use the Dependency Injection library ActiveJ Inject to provide RPC Server
and RPC Client
with the appropriate configurations and necessary dependencies. The RPC Client
sends requests with a String message to the specified server according to the provided RPC strategy (getting a single RPC-service).
For the RpcServer
we define the message types to handle, a corresponding RpcRequestHandler
and a listen port.
Since we are extending Launcher
, we will also override 2 methods: getModule
to provide ServiceGraphModule
and run
to describe the main logic of the example.
We also provide an initializer for ServiceGraphModuleSettings
. As we launch RPC client and server in the same launcher, we need to instruct ServiceGraph that RPC client depends on RPC server
and needs to be started after the server has been started.
Finally, we define the main
method, which will launch our example.
You can find example sources on GitHub
Round-Robin Strategy
ActiveJ RPC contains predefined strategies for arranging requests between RPC servers or shards of servers. Round-Robin is one of the simplest of strategies: it simply cycles through servers or shards one by one.
In this example we create an RPC pool with 5 identical connections and set a Round-Robin strategy for them. Then we create a sender for the pool with the previously defined strategy. That's it, 100 requests will be evenly distributed among the servers:
public void roundRobinTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int iterations = 100;
RpcStrategy strategy = RpcStrategies.roundRobin(servers(address1, address2, address3, address4, address5));
RpcSender sender = strategy.createSender(pool);
for (int i = 0; i < iterations; i++) {
sender.sendRequest(new Object(), 50, ignore());
}
List<RpcSenderStub> connections =
List.of(connection1, connection2, connection3, connection4, connection5);
for (int i = 0; i < 5; i++) {
assertEquals(iterations / 5, connections.get(i).getRequests());
}
}
You can find example sources on GitHub
Round-Robin and First Available Strategies Combined
You can simply combine RPC strategies. In this example, we will combine a Round Robin and a First Available strategies.
First, we create 4 connections without putting connection3 into the pool. Then we start sending 20 requests. As a result, all requests will be evenly distributed between connection1 (as it is always first available) and connection4 (as connection3 is not available for the pool):
public void roundRobinAndFirstAvailableTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
// we don't put connection3
pool.put(address4, connection4);
int iterations = 20;
RpcStrategy strategy = RpcStrategies.roundRobin(
firstAvailable(servers(address1, address2)),
firstAvailable(servers(address3, address4)));
RpcSender sender = strategy.createSender(pool);
for (int i = 0; i < iterations; i++) {
sender.sendRequest(new Object(), 50, assertNoCalls());
}
assertEquals(iterations / 2, connection1.getRequests());
assertEquals(0, connection2.getRequests());
assertEquals(0, connection3.getRequests());
assertEquals(iterations / 2, connection4.getRequests());
}
You can find example sources on GitHub
Sharding and First Valid Strategies Combined
You can also create your own sharding functions and combine them with other strategies as needed. In this example, we create 5 identical connections but we do not put connection2 into the pool. Next, we provide a simple sharding function that distributes requests among shards according to the content of the request. We split the connections into two shards and set a First Valid Result strategy for both. This strategy sends the request to all available servers.
Now, we manually send 7 requests:
- 4 with
0
message, so they will be sent to the first shard's connection1 - 3 with
1
, so they will all be sent to all three connections of the second shard
public void shardingAndFirstValidTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
// we don't put connection2
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int shardsCount = 2;
RpcStrategy strategy = sharding(
item -> (Integer) item % shardsCount,
firstValidResult(servers(address1, address2)),
firstValidResult(servers(address3, address4, address5)));
RpcSender sender = strategy.createSender(pool);
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
assertEquals(4, connection1.getRequests());
assertEquals(0, connection2.getRequests());
assertEquals(3, connection3.getRequests());
assertEquals(3, connection4.getRequests());
assertEquals(3, connection5.getRequests());
}
You can find example sources on GitHub
Rendezvous Hashing Strategy
Rendezvous hashing strategy pre-computes the hash function for the RpcSender
and creates a map of RPC servers. The map is stored in a cache and will be recalculated only if servers go online/offline.
In this example, requests will be evenly distributed between connection1, connection2, and connection3:
public void rendezvousHashingTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
RpcStrategy strategy = RendezvousHashing.builder((Integer item) -> item)
.withShard(1, firstAvailable(servers(address1, address2)))
.withShard(2, firstAvailable(servers(address3, address4)))
.withShard(3, server(address5))
.build();
int iterationsPerLoop = 1000;
RpcSender sender;
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerLoop; i++) {
sender.sendRequest(i, 50, ignore());
}
When we remove some connections from the pool, the hash function is recalculated:
pool.remove(address3);
pool.remove(address4);
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerLoop; i++) {
sender.sendRequest(i, 50, ignore());
}
double acceptableError = iterationsPerLoop / 10.0;
assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection1.getRequests(), acceptableError);
assertEquals(0, connection2.getRequests());
assertEquals(iterationsPerLoop / 3.0, connection3.getRequests(), acceptableError);
assertEquals(0, connection4.getRequests());
assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection5.getRequests(), acceptableError);
}
You can find example sources on GitHub
Type Dispatch Strategy
This strategy simply distributes requests among shards according to the type of the request. In the example, all String requests are sent on the first shard, which has a First Valid Result strategy for servers. Requests of all other types are sent to the second shard with a First Available strategy. The result is that connection1 and connection2 will handle 35 requests, connection3 will handle 25 requests, while connection4 and connection5 will handle 0 requests becasue connection3 was always First Available:
public void typeDispatchTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int timeout = 50;
int iterationsPerDataStub = 25;
int iterationsPerDataStubWithKey = 35;
RpcSender sender;
RpcStrategy strategy = TypeDispatching.builder()
.with(String.class,
firstValidResult(servers(address1, address2)))
.withDefault(
firstAvailable(servers(address3, address4, address5)))
.build();
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerDataStub; i++) {
sender.sendRequest(new Object(), timeout, assertNoCalls());
}
for (int i = 0; i < iterationsPerDataStubWithKey; i++) {
sender.sendRequest("request", timeout, assertNoCalls());
}
assertEquals(iterationsPerDataStubWithKey, connection1.getRequests());
assertEquals(iterationsPerDataStubWithKey, connection2.getRequests());
assertEquals(iterationsPerDataStub, connection3.getRequests());
assertEquals(0, connection4.getRequests());
assertEquals(0, connection5.getRequests());
}
You can find example sources on GitHub