Перейти к основному содержанию

Хранение ключей-значений

В этом руководстве мы создадим удаленное хранилище ключей-значений, которое будет иметь 2 основные операции: положить и получить. Когда пишет распределенное приложение, часто возникает вопрос о том, какой протокол использовать для взаимодействия. Существует два основных варианта :

  • HTTP/REST
  • RPC

Хотя HTTP является более популярным и хорошо специфицированным, он имеет некоторые накладные расходы. Когда производительность является важным аспектом приложения, следует использовать что-то более быстрое. Для этого был разработан ActiveJ RPC на основе быстрых сериализаторов и оптимизированного на заказ протокола взаимодействия, что позволяет значительно повысить производительность приложения.

Что вам понадобится:#

  • IDE или терминал
  • JDK 1.8
  • Maven 3.0

Чтобы продолжить работу с этим руководством, у вас есть 2 варианта:#

  • Загрузите и запустите рабочий пример
  • Следуйте пошаговому руководству

Пример работы#

Чтобы запустить пример в IDE, сначала локально клонируйте проект ActiveJ :

git clone https://github.com/activej/activej.git

И импортируйте его как проект Maven. Посмотрите ветку v5.0-beta2.

Перед запуском примера выполните сборку проекта (Ctrl F9 для IntelliJ IDEA).

Затем перейдите в раздел testing .

Пошаговое руководство#

1. Настройте проект#

Сначала создайте папку для приложения и постройте соответствующую структуру проекта:

remote-key-value-storage└── pom.xml└── src    └── main        └── java            └── GetRequest.java            └── GetResponse.java            └── PutRequest.java            └── PutResponse.java            └── KeyValueStore.java            └── ServerModule.java            └── ServerLauncher.java            └── ClientModule.java            └── ClientLauncher.java

Затем настройте файл pom.xml следующим образом:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
  <groupId>io.activej</groupId>  <artifactId>tutorial-rpc-kv-storage</artifactId>  <version>5.0-SNAPSHOT</version>
  <name>Tutorials : Rpc-KV-Storage</name>
  <properties>    <maven.compiler.source>1.8</maven.compiler.source>    <maven.compiler.target>1.8</maven.compiler.target>  </properties>
  <dependencies>    <dependency>      <groupId>io.activej</groupId>      <artifactId>activej-boot</artifactId>      <version>${project.version}</version>    </dependency>    <dependency>      <groupId>io.activej</groupId>      <artifactId>activej-rpc</artifactId>      <version>${project.version}</version>    </dependency>    <dependency>      <groupId>ch.qos.logback</groupId>      <artifactId>logback-classic</artifactId>      <version>1.2.3</version>    </dependency>  </dependencies></project>

2. Определите базовую функциональность приложения#

Поскольку нам нужно реализовать две основные операции (положить и получить), давайте начнем с написания классов, которые будут использоваться для связи между клиентом и сервером, а именно PutRequest, PutResponse, GetRequestи GetResponse. Экземпляры этих классов будут сериализованы с помощью молниеносной библиотеки сериализатора ActiveJ Serializer. Для этого требуется некоторая метаинформация об этих классах, которая предоставляется соответствующими аннотациями. Основные правила таковы:

  • Используйте аннотацию @Serialize с порядковым номером в геттере свойства. Упорядочивание обеспечивает лучшую совместимость в случае изменения классов.
  • Используйте аннотацию @Deserialize с именем свойства (которое должно быть таким же, как в геттере) в конструкторе.
  • Используйте @SerializeNullable для свойств, которые могут иметь значения null. Поэтому занятия по коммуникации должны выглядеть следующим образом:
public class PutRequest {
  private final String key;  private final String value;
  public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) {    this.key = key;    this.value = value;  }
  @Serialize  public String getKey() {    return key;  }
  @Serialize  public String getValue() {    return value;  }}
public class PutResponse {  private final String previousValue;
  public PutResponse(@Deserialize("previousValue") String previousValue) {    this.previousValue = previousValue;  }
  @Serialize  @SerializeNullable  public String getPreviousValue() {    return previousValue;  }
  @Override  public String toString() {    return "{previousValue='" + previousValue + '\'' + '}';  }}
public class GetRequest {
  private final String key;
  public GetRequest(@Deserialize("key") String key) {    this.key = key;  }
  @Serialize  public String getKey() {    return key;  }}
public class GetResponse {  private final String value;
  public GetResponse(@Deserialize("value") String value) {    this.value = value;  }
  @Serialize  @SerializeNullable  public String getValue() {    return value;  }
  @Override  public String toString() {    return "{value='" + value + '\'' + '}';  }}

Далее, давайте напишем простую реализацию хранилища ключевых значений KeyValueStore

public class KeyValueStore {
  private final Map<String, String> store = new HashMap<>();
  public String put(String key, String value) {    return store.put(key, value);  }
  public String get(String key) {    return store.get(key);  }}

3. Создание клиента и сервера#

Теперь давайте напишем AbstractModule для RPC-сервера, используя библиотеку ActiveJ Inject dependency injection для обработки запросов get и put .

public class ServerModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withFatalErrorHandler(rethrowOnAnyError());  }
  @Provides  KeyValueStore keyValueStore() {    return new KeyValueStore();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) {    return RpcServer.create(eventloop)        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))        .withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))        .withListenPort(RPC_SERVER_PORT);  }}

Как видите, чтобы правильно создать RpcServer , мы должны указать все классы, которые будут передаваться между клиентом и сервером, и указать соответствующий RpcRequestHandler для каждого класса запроса. Мы представляем их как третьи аргументы в этих строках, используя лямбды.

.withHandler(PutRequest.class, PutResponse.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue())))).withHandler(GetRequest.class, GetResponse.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))

Далее, создайте ServerLauncher для сервера RPC. Используйте ActiveJ Launcher для управления жизненным циклом приложения:

public class ServerLauncher extends Launcher {  @Inject  private RpcServer server;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ServerModule());  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    ServerLauncher launcher = new ServerLauncher();    launcher.launch(args);  }}

Поскольку мы расширяем Launcher, мы также переопределим 2 метода: getModule для предоставления ServiceGraphModule и run для описания основной логики примера. Теперь давайте напишем RPC-клиент. Мы должны указать все классы, которые будут использоваться для связи, и указать стратегию RPC. Все стратегии можно комбинировать, но поскольку у нас только один сервер, мы будем использовать стратегию single-server :

public class ClientModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withFatalErrorHandler(rethrowOnAnyError())        .withCurrentThread();  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withConnectTimeout(Duration.ofSeconds(1))        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT)));  }}

Давайте также построим ClientLauncher. В run() мы рассмотрим аргументы командной строки и сделаем соответствующие запросы к RpcServer.

public class ClientLauncher extends Launcher {  private static final int TIMEOUT = 1000;
  @Inject  private RpcClient client;
  @Inject  Eventloop eventloop;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ClientModule());  }
  @Override  protected void run() throws Exception {    if (args.length < 2) {      System.err.println("Command line args:\n\t--put key value\n\t--get key");      return;    }
    switch (args[0]) {      case "--put":        CompletableFuture<PutResponse> future1 = eventloop.submit(() ->            client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)        );        PutResponse putResponse = future1.get();        System.out.println("PutResponse: " + putResponse);        break;      case "--get":        CompletableFuture<GetResponse> future2 = eventloop.submit(() ->            client.sendRequest(new GetRequest(args[1]), TIMEOUT)        );        GetResponse getResponse = future2.get();        System.out.println("GetResponse: " + getResponse);        break;      default:        throw new RuntimeException("Unsupported option: " + args[0]);    }  }
  public static void main(String[] args) throws Exception {    ClientLauncher launcher = new ClientLauncher();    launcher.launch(args);  }}

Поздравляю! Мы закончили написание кода для этого приложения.

Тестирование#

Сначала запустите сервер. Откройте класс ServerLauncher и запустите его метод main() . Затем сделайте запрос PUT . Откройте класс ClientLauncher , который находится по адресу activej/examples/tutorials/rpc-kv-storage и задайте аргументы программы --put key1 value1. Для IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --put key1 value1||. Затем запустите метод launcher main() . Вы увидите следующий результат:

PutResponse: {previousValue='null'}

Наконец, сделайте запрос GET .

Снова откройте класс ClientLauncher и задайте аргументы программы --get key1. Для IntelliJ IDEA: Run -> Edit configurations -> |Run/Debug Configurations -> |Program arguments -> --get key1||. Затем запустите main() метод клиентской программы запуска.

Вы увидите следующий результат:

GetResponse: {value='value1'}

Поздравляем, вы только что создали удаленное хранилище ключей-значений с помощью протокола связи RPC!