Skip to main content

Key-Value Storage

In this tutorial, we will look at a remote key-value storage that has 2 basic operations: put and get. When writing distributed applications, the question often arises about which protocol to use for communication. There are two main options:

  • HTTP/REST
  • RPC

Although HTTP is more popular and well specified, it has some overhead. When performance is an important aspect of an application, you need to use something faster. ActiveJ RPC was developed for this purpose, based on fast serializers and an optimized communication protocol, which significantly improves application performance.

What you will need:

  • IDE or terminal
  • JDK 1.8+
  • Maven 3.0+

To proceed with this guide you have need to download and run a working example

Working Example

First of all, clone ActiveJ project locally:

git clone -b examples-5.3 https://github.com/activej/activej.git

Then open the project in your IDE of choice. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).

If you want to skip source code explanation, you can go straight to testing section.

Source code

Basic app functionality

Since we need to implement two basic operations (put and get), let's start with the message classes that are used to communicate between client and server, namely PutRequest, PutResponse, GetRequest, and GetResponse. Instances of these classes are serialized using a lightning-fast serializer library ActiveJ Serializer. It requires some meta information about these classes, which is provided by the corresponding annotations. The basic rules are as follows:

  • Use @Serialize annotation on properties that need to be serialized.
  • Use @Deserialize annotation with a property name (which should be the same as the one in the getter) in constructor.
  • Use @SerializeNullable on properties that can have null values.

Message classes look like these:

public class PutRequest {
  private final String key;  private final String value;
  public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) {    this.key = key;    this.value = value;  }
  @Serialize  public String getKey() {    return key;  }
  @Serialize  public String getValue() {    return value;  }}
public class PutResponse {  private final String previousValue;
  public PutResponse(@Deserialize("previousValue") String previousValue) {    this.previousValue = previousValue;  }
  @Serialize  @SerializeNullable  public String getPreviousValue() {    return previousValue;  }
  @Override  public String toString() {    return "{previousValue='" + previousValue + '\'' + '}';  }}
public class GetRequest {
  private final String key;
  public GetRequest(@Deserialize("key") String key) {    this.key = key;  }
  @Serialize  public String getKey() {    return key;  }}
public class GetResponse {  private final String value;
  public GetResponse(@Deserialize("value") String value) {    this.value = value;  }
  @Serialize  @SerializeNullable  public String getValue() {    return value;  }
  @Override  public String toString() {    return "{value='" + value + '\'' + '}';  }}

Next, let's look at a simple implementation of a key-value storage, the KeyValueStore

public class KeyValueStore {
  private final Map<String, String> store = new HashMap<>();
  public String put(String key, String value) {    return store.put(key, value);  }
  public String get(String key) {    return store.get(key);  }}

A key-value store is very minimalistic. It uses a regular Java's Map to store keys with corresponding values.

Client and server

Now let's look at the AbstractModule for the RPC server, which uses the ActiveJ Inject dependency injection library to handle get and put requests.

public class ServerModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withEventloopFatalErrorHandler(rethrow());  }
  @Provides  KeyValueStore keyValueStore() {    return new KeyValueStore();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) {    return RpcServer.create(eventloop)        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))        .withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))        .withListenPort(RPC_SERVER_PORT);  }}

As you can see, to properly define the RpcServer, we specified all the message classes that are sent between client and server, and specified the corresponding RpcRequestHandler for each request class.

We have specified them as the second arguments in these lines, using lambdas.

.withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue())))).withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))

Next, let's look at the ServerLauncher for the RPC server. The ActiveJ Launcher is used here to manage the lifecycle of the application:

public class ServerLauncher extends Launcher {  @Inject  private RpcServer server;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ServerModule());  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    ServerLauncher launcher = new ServerLauncher();    launcher.launch(args);  }}

Since we extended Launcher class, we also had to override 2 methods: getModule() to provide ServiceGraphModule and run() to describe the main logic of the example.

Now let's look at the RPC client. We have once again specified all the message classes that are used for communication, and we have specified the RPC strategy. All strategies can be combined, but since we have only one server, we use a single-server strategy:

public class ClientModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withEventloopFatalErrorHandler(rethrow())        .withCurrentThread();  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withConnectTimeout(Duration.ofSeconds(1))        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT)));  }}

Let's also take a look at the ClientLauncher. In the run() method, we are parsing command line arguments and make appropriate requests to the RpcServer

public class ClientLauncher extends Launcher {  private static final int TIMEOUT = 1000;
  @Inject  private RpcClient client;
  @Inject  Eventloop eventloop;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ClientModule());  }
  @Override  protected void run() throws Exception {    if (args.length < 2) {      System.err.println("Command line args:\n\t--put key value\n\t--get key");      return;    }
    switch (args[0]) {      case "--put":        CompletableFuture<PutResponse> future1 = eventloop.submit(() ->            client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)        );        PutResponse putResponse = future1.get();        System.out.println("PutResponse: " + putResponse);        break;      case "--get":        CompletableFuture<GetResponse> future2 = eventloop.submit(() ->            client.sendRequest(new GetRequest(args[1]), TIMEOUT)        );        GetResponse getResponse = future2.get();        System.out.println("GetResponse: " + getResponse);        break;      default:        throw new RuntimeException("Unsupported option: " + args[0]);    }  }
  public static void main(String[] args) throws Exception {    ClientLauncher launcher = new ClientLauncher();    launcher.launch(args);  }}

Congratulations! We have finished exploring the code of this application.

Testing

First, launch the server. Open the ServerLauncher class and run its main() method.

Then make a PUT request. Open ClientLauncher class which is located at activej/examples/tutorials/rpc-kv-storage, and set the program arguments to --put key1 value1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --put key1 value1||. Then run the main() method of a launcher.

You will see the following output:

PutResponse: {previousValue='null'}

Finally, make a GET request.

Open the ClientLauncher class once again and set the program arguments to --get key1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||. Then run the main() method of a launcher.

You will see the following output:

GetResponse: {value='value1'}

Congratulations, you have just launched a remote key-value storage based on the RPC communication protocol!