Skip to main content

Key-Value Storage

In this guide we will look at a remote key-value storage which has 2 basic operations: put and get. When writing distributed application the common concern is what protocol to use for communication. There are two main options:

  • HTTP/REST
  • RPC

While HTTP is more popular and well-specified, it has some overhead. When performance is a significant aspect of application, you should use something faster. For this purpose ActiveJ RPC was designed based on fast serializers and custom optimized communication protocol that allows to significantly improve application performance.

What you will need:

  • IDE or terminal
  • JDK 1.8+
  • Maven 3.0+

To proceed with this guide you have need to download and run a working example

Working Example

First of all, clone ActiveJ project locally:

git clone -b examples-5.2 https://github.com/activej/activej.git

Then open the project in your IDE of choice. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).

If you want to skip source code explanation, you can go straight to testing section.

Source code

Basic app functionality

Since we have two basic operations to implement (put and get), let's start with looking at message classes that are used for communication between client and server, specifically PutRequest, PutResponse, GetRequest, and GetResponse. Instances of these classes are serialized by a lightning-fast serializer library ActiveJ Serializer. It requires some meta information about these classes, which is provided by appropriate annotations. The basic rules are:

  • Use @Serialize annotation on properties that need to be serialized.
  • Use @Deserialize annotation with a property name (which should be the same as the one in the getter) in constructor.
  • Use @SerializeNullable on properties that can have null values.

Message classes look like these:

public class PutRequest {
  private final String key;  private final String value;
  public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) {    this.key = key;    this.value = value;  }
  @Serialize  public String getKey() {    return key;  }
  @Serialize  public String getValue() {    return value;  }}
public class PutResponse {  private final String previousValue;
  public PutResponse(@Deserialize("previousValue") String previousValue) {    this.previousValue = previousValue;  }
  @Serialize  @SerializeNullable  public String getPreviousValue() {    return previousValue;  }
  @Override  public String toString() {    return "{previousValue='" + previousValue + '\'' + '}';  }}
public class GetRequest {
  private final String key;
  public GetRequest(@Deserialize("key") String key) {    this.key = key;  }
  @Serialize  public String getKey() {    return key;  }}
public class GetResponse {  private final String value;
  public GetResponse(@Deserialize("value") String value) {    this.value = value;  }
  @Serialize  @SerializeNullable  public String getValue() {    return value;  }
  @Override  public String toString() {    return "{value='" + value + '\'' + '}';  }}

Next, let's look at a simple implementation of a key-value storage KeyValueStore

public class KeyValueStore {
  private final Map<String, String> store = new HashMap<>();
  public String put(String key, String value) {    return store.put(key, value);  }
  public String get(String key) {    return store.get(key);  }}

A key-value store is very minimalistic. It uses a regular Java's Map to store keys with corresponding values.

Client and server

Now, let's look at an AbstractModule for the RPC server using ActiveJ Inject dependency injection library to handle the get and put requests.

public class ServerModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withEventloopFatalErrorHandler(rethrow());  }
  @Provides  KeyValueStore keyValueStore() {    return new KeyValueStore();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) {    return RpcServer.create(eventloop)        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))        .withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))        .withListenPort(RPC_SERVER_PORT);  }}

As you can see, in order to properly define an RpcServer we have indicated all the message classes that are sent between client and server, and have specified the appropriate RpcRequestHandler for each request class.

We have represented them as the second arguments in these lines using lambdas.

.withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue())))).withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))

Next, let's look at the ServerLauncher for the RPC server. ActiveJ Launcher is used here to manage application lifecycle:

public class ServerLauncher extends Launcher {  @Inject  private RpcServer server;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ServerModule());  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    ServerLauncher launcher = new ServerLauncher();    launcher.launch(args);  }}

Since we have extended Launcher class, we also had to override 2 methods: getModule to provide ServiceGraphModule and run to describe the main logic of the example.

Now, let's look at the RPC client. We have once again indicated all the message classes that are used for communication and have specified the RPC strategy. All strategies can be combined, but since we have only one server, we use a single-server strategy:

public class ClientModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withEventloopFatalErrorHandler(rethrow())        .withCurrentThread();  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withConnectTimeout(Duration.ofSeconds(1))        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT)));  }}

Let's also look at ClientLauncher. In run() we are considering command line arguments and make appropriate requests to RpcServer

public class ClientLauncher extends Launcher {  private static final int TIMEOUT = 1000;
  @Inject  private RpcClient client;
  @Inject  Eventloop eventloop;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ClientModule());  }
  @Override  protected void run() throws Exception {    if (args.length < 2) {      System.err.println("Command line args:\n\t--put key value\n\t--get key");      return;    }
    switch (args[0]) {      case "--put":        CompletableFuture<PutResponse> future1 = eventloop.submit(() ->            client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)        );        PutResponse putResponse = future1.get();        System.out.println("PutResponse: " + putResponse);        break;      case "--get":        CompletableFuture<GetResponse> future2 = eventloop.submit(() ->            client.sendRequest(new GetRequest(args[1]), TIMEOUT)        );        GetResponse getResponse = future2.get();        System.out.println("GetResponse: " + getResponse);        break;      default:        throw new RuntimeException("Unsupported option: " + args[0]);    }  }
  public static void main(String[] args) throws Exception {    ClientLauncher launcher = new ClientLauncher();    launcher.launch(args);  }}

Congratulations! We've finished exploring the code for this app.

Testing

First, launch the server. Open ServerLauncher class and run its main() method.

Then make a PUT request. Open ClientLauncher class which is located at activej/examples/tutorials/rpc-kv-storage and set up program arguments to --put key1 value1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --put key1 value1||. Then run launcher's main() method.

You will see the following output:

PutResponse: {previousValue='null'}

Finally, make a GET request.

Open ClientLauncher class again and set up program arguments to --get key1. For IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||. Then run main() method of the client launcher.

You will see the following output:

GetResponse: {value='value1'}

Congratulations, you've just launched a remote key-value storage with RPC communication protocol!