Key-Value Storage
In this tutorial, we will look at a remote key-value storage that has 2 basic operations: put
and get
. When
writing distributed applications, the question often arises about which protocol to use for communication. There are two main
options:
- HTTP/REST
- RPC
Although HTTP is more popular and well specified, it has some overhead. When performance is an important aspect of an application, you need to use something faster. ActiveJ RPC was developed for this purpose, based on fast serializers and an optimized communication protocol, which significantly improves application performance.
What you will need:
- IDE or terminal
- JDK 17+
- Maven 3.0+
To proceed with this guide you have need to download and run a working example
Working Example
First of all, clone ActiveJ project locally:
git clone -b examples-6.0-beta2 https://github.com/activej/activej.git
Then open the project in your IDE of choice. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).
If you want to skip source code explanation, you can go straight to testing section.
Source code
Basic app functionality
Since we need to implement two basic operations (put
and get
), let's start with the message classes that are used to communicate between client and server, namely PutRequest, PutResponse, GetRequest, and GetResponse.
Instances of these classes are serialized using a lightning-fast serializer library ActiveJ Serializer. Since all these classes are Java records we can add @SerializeRecord
annotation on top of each class in order to instruct ActiveJ Serializer how to serialize/deserialize classes.
Message classes look like these:
@SerializeRecord
public record PutRequest(String key, String value) {}
@SerializeRecord
public record PutResponse(@SerializeNullable String previousValue) {}
@SerializeRecord
public record GetRequest(String key) {}
@SerializeRecord
public record GetResponse(@SerializeNullable String value) {}
Next, let's look at a simple implementation of a key-value storage, the KeyValueStore
public class KeyValueStore {
private final Map<String, String> store = new HashMap<>();
public String put(String key, String value) {
return store.put(key, value);
}
public String get(String key) {
return store.get(key);
}
}
A key-value store is very minimalistic. It uses a regular Java's Map
to store keys with corresponding values.
Client and server
Now let's look at the AbstractModule for the RPC server, which uses the ActiveJ Inject dependency injection library to handle get
and put
requests.
public class ServerModule extends AbstractModule {
private static final int RPC_SERVER_PORT = 5353;
@Provides
NioReactor reactor() {
return Eventloop.builder()
.withFatalErrorHandler(rethrow())
.build();
}
@Provides
KeyValueStore keyValueStore() {
return new KeyValueStore();
}
@Provides
RpcServer rpcServer(NioReactor reactor, KeyValueStore store) {
return RpcServer.builder(reactor)
.withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)
.withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.key(), req.value()))))
.withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.key()))))
.withListenPort(RPC_SERVER_PORT)
.build();
}
}
As you can see, to properly define the RpcServer, we specified all the message classes that are sent between client and server, and specified the corresponding RpcRequestHandler for each request class.
We have specified them as the second arguments in these lines, using lambdas.
.withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.key(), req.value()))))
.withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.key()))))
We also need to specify message types (PutRequest
, PutResponse
, GetRequest
, GetResponse
) that will be used for
RPC communication:
...
.withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)
Order of message types matters and should match on server and client side.
Next, let's look at the ServerLauncher
for the RPC server. The ActiveJ Launcher is used here to manage the lifecycle of the application:
public class ServerLauncher extends Launcher {
@Inject
private RpcServer server;
@Override
protected Module getModule() {
return combine(
ServiceGraphModule.create(),
new ServerModule());
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
ServerLauncher launcher = new ServerLauncher();
launcher.launch(args);
}
}
Since we extended Launcher class, we also had to override 2 methods: getModule() to provide ServiceGraphModule and run() to describe the main logic of the example.
Now let's look at the RPC client. We have once again specified all the message classes that are used for communication, and we have specified the RPC strategy. All strategies can be combined, but since we have only one server, we use a single-server strategy:
public class ClientModule extends AbstractModule {
private static final int RPC_SERVER_PORT = 5353;
@Provides
NioReactor reactor() {
return Eventloop.builder()
.withFatalErrorHandler(rethrow())
.withCurrentThread()
.build();
}
@Provides
IRpcClient rpcClient(NioReactor reactor) {
return RpcClient.builder(reactor)
.withConnectTimeout(Duration.ofSeconds(1))
.withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)
.withStrategy(server(new InetSocketAddress("localhost", RPC_SERVER_PORT)))
.build();
}
}
Let's also take a look at the ClientLauncher. In the run()
method, we are parsing command line arguments and make appropriate requests to the RpcServer
public class ClientLauncher extends Launcher {
private static final int TIMEOUT = 1000;
@Inject
private IRpcClient client;
@Inject
Reactor reactor;
@Override
protected Module getModule() {
return combine(
ServiceGraphModule.create(),
new ClientModule());
}
@Override
protected void run() throws Exception {
if (args.length < 2) {
System.err.println("Command line args:\n\t--put key value\n\t--get key");
return;
}
switch (args[0]) {
case "--put" -> {
CompletableFuture<PutResponse> future1 = reactor.submit(() ->
client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)
);
PutResponse putResponse = future1.get();
System.out.println("PutResponse: " + putResponse);
}
case "--get" -> {
CompletableFuture<GetResponse> future2 = reactor.submit(() ->
client.sendRequest(new GetRequest(args[1]), TIMEOUT)
);
GetResponse getResponse = future2.get();
System.out.println("GetResponse: " + getResponse);
}
default -> throw new RuntimeException("Unsupported option: " + args[0]);
}
}
public static void main(String[] args) throws Exception {
ClientLauncher launcher = new ClientLauncher();
launcher.launch(args);
}
}
Congratulations! We have finished exploring the code of this application.
Testing
First, launch the server. Open the ServerLauncher
class and run its main()
method.
Then make a PUT
request. Open ClientLauncher
class which is located at activej/examples/tutorials/rpc-kv-storage
,
and set the program arguments to --put key1 value1
. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --put key1 value1||
. Then run the main()
method of a launcher.
You will see the following output:
PutResponse: {previousValue='null'}
Finally, make a GET
request.
Open the ClientLauncher
class once again and set the program arguments to --get key1
. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||
. Then run the main()
method of a launcher.
You will see the following output:
GetResponse: {value='value1'}
Congratulations, you have just launched a remote key-value storage based on the RPC communication protocol!