Skip to main content

Basic Examples

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v4.3. Before running the examples, build the project. Simple RPC Example are located at activej/examples/cloud/rpc

RPC strategies examples are located at activej/cloud-rpc/src/test/RpcStrategiesTest

Simple RPC Example#

In the "Hello World" client and server RPC Example, the client sends a request which contains the word "World" to server. When server receives it, it sends a response that contains the word "Hello ". If everything completes successfully, we get the following output:

Got result: Hello World

Let's take a look at the implementation:

public class RpcExample extends Launcher {
private static final int SERVICE_PORT = 34765;
@Inject
private RpcClient client;
@Inject
private RpcServer server;
@Inject
private Eventloop eventloop;
@Provides
Eventloop eventloop() {
return Eventloop.create();
}
@Provides
RpcServer rpcServer(Eventloop eventloop) {
return RpcServer.create(eventloop)
.withMessageTypes(String.class)
.withHandler(String.class,
request -> Promise.of("Hello " + request))
.withListenPort(SERVICE_PORT);
}
@Provides
RpcClient rpcClient(Eventloop eventloop) {
return RpcClient.create(eventloop)
.withMessageTypes(String.class)
.withStrategy(server(new InetSocketAddress(SERVICE_PORT)));
}
@ProvidesIntoSet
Initializer<ServiceGraphModuleSettings> configureServiceGraph() {
// add logical dependency so that service graph starts client only after it started the server
return settings -> settings.addDependency(Key.of(RpcClient.class), Key.of(RpcServer.class));
}
@Override
protected Module getModule() {
return ServiceGraphModule.create();
}
@Override
protected void run() throws ExecutionException, InterruptedException {
CompletableFuture<Object> future = eventloop.submit(() ->
client.sendRequest("World", 1000)
);
System.out.printf("%nRPC result: %s %n%n", future.get());
}
public static void main(String[] args) throws Exception {
RpcExample example = new RpcExample();
example.launch(args);
}
}

RpcExample class extends ActiveJ Launcher to help us manage application lifecycle.

Next, we use Dependency Injection library ActiveJ Inject to provide RpcServer and RpcClient with relevant configurations and required dependencies. RpcClient sends requests with a String message to the specified server according to the provided RPC strategy (getting a single RPC-service). For RpcServer we define the type of messages to proceed, a corresponding RpcRequestHandler and a listener port.

Since we extend Launcher, we will also override 2 methods: getModule to provide ServiceGraphModule and run to describe the main logic of the example.

Finally, we define the main method, which will launch our example.

You can find example sources on GitHub

Round-Robin Strategy#

ActiveJ RPC contains pre-defined strategies for requests arrangement between RPC servers or shards of servers. Round-Robin is one of the simplest of strategies: it just goes through the servers or shards in a cyclic way one by one.

In this example we create an RPC pool with 5 equal connections and set Round-Robin strategy for them. Next, we create a sender for the pool with the previously defined strategy. That's it, 100 requests will be equally distributed between the servers:

public void roundRobinTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int iterations = 100;
RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
RpcSender sender = strategy.createSender(pool);
for (int i = 0; i < iterations; i++) {
sender.sendRequest(new Object(), 50, ignore());
}
List<RpcSenderStub> connections =
asList(connection1, connection2, connection3, connection4, connection5);
for (int i = 0; i < 5; i++) {
assertEquals(iterations / 5, connections.get(i).getRequests());
}
}

You can find example sources on GitHub

Round-Robin and First Available Strategies Combined#

You can simply combine RPC strategies. In this example we will combine Round Robin and First Available strategies.

First, we create 4 connections without putting connection3 into the pool. Then we start sending 20 requests. As a result, all the requests will be equally distributed between connection1 (as it is always first available) and connection4 (as connection3 isn't available for the pool):

public void roundRobinAndFirstAvailableTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
// we don't put connection3
pool.put(address4, connection4);
int iterations = 20;
RpcStrategy strategy = roundRobin(
firstAvailable(servers(address1, address2)),
firstAvailable(servers(address3, address4)));
RpcSender sender = strategy.createSender(pool);
for (int i = 0; i < iterations; i++) {
sender.sendRequest(new Object(), 50, assertNoCalls());
}
assertEquals(iterations / 2, connection1.getRequests());
assertEquals(0, connection2.getRequests());
assertEquals(0, connection3.getRequests());
assertEquals(iterations / 2, connection4.getRequests());
}

You can find example sources on GitHub

Sharding and First Valid Strategies Combined#

You can also create your own sharding functions and combine them with other strategies if needed. In this example we create 5 equal connections but don't put connection2 into the pool. Next, we provide a simple sharding function which distributes requests between shards in accordance to the content of the request. We split the connections into two shards, and set First Valid Result strategy for both of them. This strategy sends request to all available servers.

Now, we manually send 7 requests:

  • 4 with 0 message, so they'll be sent to the first shard's connection1
  • 3 with 1, so they'll all be sent to all three connections of the second shard
public void shardingAndFirstValidTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
// we don't put connection2
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int shardsCount = 2;
ShardingFunction<Integer> shardingFunction = item -> item % shardsCount;
RpcStrategy strategy = sharding(shardingFunction,
firstValidResult(servers(address1, address2)),
firstValidResult(servers(address3, address4, address5)));
RpcSender sender = strategy.createSender(pool);
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(0, 50, assertNoCalls());
sender.sendRequest(1, 50, assertNoCalls());
assertEquals(4, connection1.getRequests());
assertEquals(0, connection2.getRequests());
assertEquals(3, connection3.getRequests());
assertEquals(3, connection4.getRequests());
assertEquals(3, connection5.getRequests());
}

You can find example sources on GitHub

Rendezvous Hashing Strategy#

Rendezvous hashing strategy pre-calculates the hash function for the RpcSender and creates a map of RPC servers. The map is stored in cache and will be re-calculated only if servers go online/offline.

In this example requests will be equally distributed between connection1, connection2, and connection3:

public void rendezvousHashingTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
HashFunction<Integer> hashFunction = item -> item;
RpcStrategy strategy = rendezvousHashing(hashFunction)
.withShard(1, firstAvailable(servers(address1, address2)))
.withShard(2, firstAvailable(servers(address3, address4)))
.withShard(3, server(address5));
int iterationsPerLoop = 1000;
RpcSender sender;
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerLoop; i++) {
sender.sendRequest(i, 50, ignore());
}

When we remove some of the connections from the pool, hash function is recalculated:

pool.remove(address3);
pool.remove(address4);
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerLoop; i++) {
sender.sendRequest(i, 50, ignore());
}
double acceptableError = iterationsPerLoop / 10.0;
assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection1.getRequests(), acceptableError);
assertEquals(0, connection2.getRequests());
assertEquals(iterationsPerLoop / 3.0, connection3.getRequests(), acceptableError);
assertEquals(0, connection4.getRequests());
assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection5.getRequests(), acceptableError);
}

You can find example sources on GitHub

Type Dispatch Strategy#

This strategy simply distributes requests among shards in accordance to the type of the request. In the example all String requests are sent on the first shard which has First Valid Result strategy for the servers. Requests with all other types are sent to the second shard with First Available strategy. As a result, connection1 and connection2 will process 35 requests, connection3 - 25 requests, while connection4 and connection5 - 0 requests as connection3 was always First Available:

public void typeDispatchTest() {
RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();
RpcSenderStub connection1 = new RpcSenderStub();
RpcSenderStub connection2 = new RpcSenderStub();
RpcSenderStub connection3 = new RpcSenderStub();
RpcSenderStub connection4 = new RpcSenderStub();
RpcSenderStub connection5 = new RpcSenderStub();
pool.put(address1, connection1);
pool.put(address2, connection2);
pool.put(address3, connection3);
pool.put(address4, connection4);
pool.put(address5, connection5);
int timeout = 50;
int iterationsPerDataStub = 25;
int iterationsPerDataStubWithKey = 35;
RpcSender sender;
RpcStrategy strategy = typeDispatching()
.on(String.class,
firstValidResult(servers(address1, address2)))
.onDefault(
firstAvailable(servers(address3, address4, address5)));
sender = strategy.createSender(pool);
for (int i = 0; i < iterationsPerDataStub; i++) {
sender.sendRequest(new Object(), timeout, assertNoCalls());
}
for (int i = 0; i < iterationsPerDataStubWithKey; i++) {
sender.sendRequest("request", timeout, assertNoCalls());
}
assertEquals(iterationsPerDataStubWithKey, connection1.getRequests());
assertEquals(iterationsPerDataStubWithKey, connection2.getRequests());
assertEquals(iterationsPerDataStub, connection3.getRequests());
assertEquals(0, connection4.getRequests());
assertEquals(0, connection5.getRequests());
}

You can find example sources on GitHub