Key-Value Storage
In this tutorial, we will look at a remote key-value storage that has 2 basic operations: put
and get
. When
writing distributed applications, the question often arises about which protocol to use for communication. There are two main
options:
- HTTP/REST
- RPC
Although HTTP is more popular and well specified, it has some overhead. When performance is an important aspect of an application, you need to use something faster. ActiveJ RPC was developed for this purpose, based on fast serializers and an optimized communication protocol, which significantly improves application performance.
What you will need:
- IDE or terminal
- JDK 1.8+
- Maven 3.0+
To proceed with this guide you have need to download and run a working example
Working Example
First of all, clone ActiveJ project locally:
git clone -b examples-5.3 https://github.com/activej/activej.git
Then open the project in your IDE of choice. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).
If you want to skip source code explanation, you can go straight to testing section.
Source code
Basic app functionality
Since we need to implement two basic operations (put
and get
), let's start with the message classes that are used to communicate between client and server, namely PutRequest, PutResponse, GetRequest, and GetResponse.
Instances of these classes are serialized using a lightning-fast serializer library ActiveJ Serializer. It requires some meta information about these classes, which is provided by the corresponding annotations. The basic rules are as follows:
- Use @Serialize annotation on properties that need to be serialized.
- Use @Deserialize annotation with a property name (which should be the same as the one in the getter) in constructor.
- Use @SerializeNullable on properties that can have
null
values.
Message classes look like these:
public class PutRequest {
private final String key; private final String value;
public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) { this.key = key; this.value = value; }
@Serialize public String getKey() { return key; }
@Serialize public String getValue() { return value; }}
public class PutResponse { private final String previousValue;
public PutResponse(@Deserialize("previousValue") String previousValue) { this.previousValue = previousValue; }
@Serialize @SerializeNullable public String getPreviousValue() { return previousValue; }
@Override public String toString() { return "{previousValue='" + previousValue + '\'' + '}'; }}
public class GetRequest {
private final String key;
public GetRequest(@Deserialize("key") String key) { this.key = key; }
@Serialize public String getKey() { return key; }}
public class GetResponse { private final String value;
public GetResponse(@Deserialize("value") String value) { this.value = value; }
@Serialize @SerializeNullable public String getValue() { return value; }
@Override public String toString() { return "{value='" + value + '\'' + '}'; }}
Next, let's look at a simple implementation of a key-value storage, the KeyValueStore
public class KeyValueStore {
private final Map<String, String> store = new HashMap<>();
public String put(String key, String value) { return store.put(key, value); }
public String get(String key) { return store.get(key); }}
A key-value store is very minimalistic. It uses a regular Java's Map
to store keys with corresponding values.
Client and server
Now let's look at the AbstractModule for the RPC server, which uses the ActiveJ Inject dependency injection library to handle get
and put
requests.
public class ServerModule extends AbstractModule { private static final int RPC_SERVER_PORT = 5353;
@Provides Eventloop eventloop() { return Eventloop.create() .withEventloopFatalErrorHandler(rethrow()); }
@Provides KeyValueStore keyValueStore() { return new KeyValueStore(); }
@Provides RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) { return RpcServer.create(eventloop) .withSerializerBuilder(SerializerBuilder.create()) .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class) .withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue())))) .withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey())))) .withListenPort(RPC_SERVER_PORT); }}
As you can see, to properly define the RpcServer, we specified all the message classes that are sent between client and server, and specified the corresponding RpcRequestHandler for each request class.
We have specified them as the second arguments in these lines, using lambdas.
.withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue())))).withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))
Next, let's look at the ServerLauncher
for the RPC server. The ActiveJ Launcher is used here to manage the lifecycle of the application:
public class ServerLauncher extends Launcher { @Inject private RpcServer server;
@Override protected Module getModule() { return combine( ServiceGraphModule.create(), new ServerModule()); }
@Override protected void run() throws Exception { awaitShutdown(); }
public static void main(String[] args) throws Exception { ServerLauncher launcher = new ServerLauncher(); launcher.launch(args); }}
Since we extended Launcher class, we also had to override 2 methods: getModule() to provide ServiceGraphModule and run() to describe the main logic of the example.
Now let's look at the RPC client. We have once again specified all the message classes that are used for communication, and we have specified the RPC strategy. All strategies can be combined, but since we have only one server, we use a single-server strategy:
public class ClientModule extends AbstractModule { private static final int RPC_SERVER_PORT = 5353;
@Provides Eventloop eventloop() { return Eventloop.create() .withEventloopFatalErrorHandler(rethrow()) .withCurrentThread(); }
@Provides RpcClient rpcClient(Eventloop eventloop) { return RpcClient.create(eventloop) .withConnectTimeout(Duration.ofSeconds(1)) .withSerializerBuilder(SerializerBuilder.create()) .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class) .withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT))); }}
Let's also take a look at the ClientLauncher. In the run()
method, we are parsing command line arguments and make appropriate requests to the RpcServer
public class ClientLauncher extends Launcher { private static final int TIMEOUT = 1000;
@Inject private RpcClient client;
@Inject Eventloop eventloop;
@Override protected Module getModule() { return combine( ServiceGraphModule.create(), new ClientModule()); }
@Override protected void run() throws Exception { if (args.length < 2) { System.err.println("Command line args:\n\t--put key value\n\t--get key"); return; }
switch (args[0]) { case "--put": CompletableFuture<PutResponse> future1 = eventloop.submit(() -> client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT) ); PutResponse putResponse = future1.get(); System.out.println("PutResponse: " + putResponse); break; case "--get": CompletableFuture<GetResponse> future2 = eventloop.submit(() -> client.sendRequest(new GetRequest(args[1]), TIMEOUT) ); GetResponse getResponse = future2.get(); System.out.println("GetResponse: " + getResponse); break; default: throw new RuntimeException("Unsupported option: " + args[0]); } }
public static void main(String[] args) throws Exception { ClientLauncher launcher = new ClientLauncher(); launcher.launch(args); }}
Congratulations! We have finished exploring the code of this application.
Testing
First, launch the server. Open the ServerLauncher
class and run its main()
method.
Then make a PUT
request. Open ClientLauncher
class which is located at activej/examples/tutorials/rpc-kv-storage
,
and set the program arguments to --put key1 value1
. For IntelliJ IDEA: Run -> Edit configurations ->
|Run/Debug Configurations -> |Program arguments -> --put key1 value1||
. Then run the main()
method of a launcher.
You will see the following output:
PutResponse: {previousValue='null'}
Finally, make a GET
request.
Open the ClientLauncher
class once again and set the program arguments to --get key1
. For IntelliJ IDEA: Run ->
Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||
. Then run the main()
method of a launcher.
You will see the following output:
GetResponse: {value='value1'}
Congratulations, you have just launched a remote key-value storage based on the RPC communication protocol!