跳到主要内容

键值存储

在本指南中,我们将创建一个远程键值存储,它将有2个基本操作: putget。 当 编写分布式应用程序时,人们普遍关心的是使用什么协议进行通信。 有两个主要的 选项。

  • HTTP/REST
  • RPC

虽然HTTP更流行,而且规范性好,但它有一些开销。 当性能是应用程序的一个重要方面时, ,你应该使用更快的东西。 为此,ActiveJ RPC是基于 快速序列化器和自定义优化的通信协议设计的,可以显著提高应用性能。

你将需要什么。#

  • IDE或终端
  • JDK 1.8
  • Maven 3.0

要继续学习本指南,你有两个选择。#

工作实例#

要在IDE中运行该例子, ,先在本地克隆ActiveJ项目

git clone https://github.com/activej/activej.git

并将其作为一个Maven项目导入。 查阅分支机构 v5.0-beta2

在运行这个例子之前,构建项目(Ctrl F9 for IntelliJ IDEA)。

然后,进入 测试 部分。

分步指南#

1. 设置项目#

首先,为应用程序创建一个文件夹并建立一个适当的项目结构。

remote-key-value-storage└── pom.xml└── src    └── main        └── java            └── GetRequest.java            └── GetResponse.java            └── PutRequest.java            └── PutResponse.java            └── KeyValueStore.java            └── ServerModule.java            └── ServerLauncher.java            └── ClientModule.java            └── ClientLauncher.java

接下来,像这样配置你的 pom.xml 文件。

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">  <modelVersion>4.0.0</modelVersion>
  <groupId>io.activej</groupId>  <artifactId>tutorial-rpc-kv-storage</artifactId>  <version>5.0-SNAPSHOT</version>
  <name>Tutorials : Rpc-KV-Storage</name>
  <properties>    <maven.compiler.source>1.8</maven.compiler.source>    <maven.compiler.target>1.8</maven.compiler.target>  </properties>
  <dependencies>    <dependency>      <groupId>io.activej</groupId>      <artifactId>activej-boot</artifactId>      <version>${project.version}</version>    </dependency>    <dependency>      <groupId>io.activej</groupId>      <artifactId>activej-rpc</artifactId>      <version>${project.version}</version>    </dependency>    <dependency>      <groupId>ch.qos.logback</groupId>      <artifactId>logback-classic</artifactId>      <version>1.2.3</version>    </dependency>  </dependencies></project>

2. 定义基本的应用程序功能#

由于我们有两个基本操作要实现(put and get),让我们开始写下将用于客户端和服务器之间通信的类,具体是 PutRequest, PutResponse, GetRequest, 和 GetResponse。 这些类的实例将由闪电般的序列化库 ActiveJ Serializer。 它需要一些关于这些类的元信息,这些信息是由适当的注释提供的。 基本规则是。

  • 在属性的获取器上使用 @Serialize 注释,并加上一个订单号。 在类被改变的情况下,排序提供了更好的兼容性。
  • 在构造函数中使用 @Deserialize 注释和一个属性名称(应与getter中的名称相同)。
  • 在可以有空值的属性上使用 @SerializeNullable。 因此,沟通的课程应该是这样的。
public class PutRequest {
  private final String key;  private final String value;
  public PutRequest(@Deserialize("key") String key, @Deserialize("value") String value) {    this.key = key;    this.value = value;  }
  @Serialize  public String getKey() {    return key;  }
  @Serialize  public String getValue() {    return value;  }}
public class PutResponse {  private final String previousValue;
  public PutResponse(@Deserialize("previousValue") String previousValue) {    this.previousValue = previousValue;  }
  @Serialize  @SerializeNullable  public String getPreviousValue() {    return previousValue;  }
  @Override  public String toString() {    return "{previousValue='" + previousValue + '\'' + '}';  }}
public class GetRequest {
  private final String key;
  public GetRequest(@Deserialize("key") String key) {    this.key = key;  }
  @Serialize  public String getKey() {    return key;  }}
public class GetResponse {  private final String value;
  public GetResponse(@Deserialize("value") String value) {    this.value = value;  }
  @Serialize  @SerializeNullable  public String getValue() {    return value;  }
  @Override  public String toString() {    return "{value='" + value + '\'' + '}';  }}

接下来,让我们写一个简单的键值存储的实现 KeyValueStore

public class KeyValueStore {
  private final Map<String, String> store = new HashMap<>();
  public String put(String key, String value) {    return store.put(key, value);  }
  public String get(String key) {    return store.get(key);  }}

3. 创建客户端和服务器#

现在,让我们为RPC服务器写下一个 AbstractModule ,使用 ActiveJ Inject 依赖注入库来处理 getput 请求。

public class ServerModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withFatalErrorHandler(rethrowOnAnyError());  }
  @Provides  KeyValueStore keyValueStore() {    return new KeyValueStore();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop, KeyValueStore store) {    return RpcServer.create(eventloop)        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withHandler(PutRequest.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue()))))        .withHandler(GetRequest.class, req -> Promise.of(new GetResponse(store.get(req.getKey()))))        .withListenPort(RPC_SERVER_PORT);  }}

正如你所看到的,为了正确创建一个 RpcServer ,我们应该指出所有将在客户端和服务器之间发送的类,并为每个请求类指定适当的 RpcRequestHandler。 我们使用lambdas将它们表示为这些行中的第三个参数。

.withHandler(PutRequest.class, PutResponse.class, req -> Promise.of(new PutResponse(store.put(req.getKey(), req.getValue() ))))).withHandler(GetRequest.class, GetResponse.class, req -> Promise.of(new GetResponse(store.get(req.getKey() )))))

接下来,创建 服务器启动程序"。 为RPC服务器。 使用 ActiveJ Launcher 来管理应用程序的生命周期。

public class ServerLauncher extends Launcher {  @Inject  private RpcServer server;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ServerModule());  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    ServerLauncher launcher = new ServerLauncher();    launcher.launch(args);  }}

由于我们扩展了 Launcher,我们也将覆盖2个方法: getModule ,以提供 ServiceGraphModulerun ,以描述该例子的主要逻辑。 现在,让我们来编写RPC客户端。 我们应该指出所有将用于通信的类,并指定 RPC策略。 所有的策略都可以结合起来,但由于我们只有一台服务器,我们将使用 single-server 策略。

public class ClientModule extends AbstractModule {  private static final int RPC_SERVER_PORT = 5353;
  @Provides  Eventloop eventloop() {    return Eventloop.create()        .withFatalErrorHandler(rethrowOnAnyError())        .withCurrentThread();  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withConnectTimeout(Duration.ofSeconds(1))        .withSerializerBuilder(SerializerBuilder.create())        .withMessageTypes(PutRequest.class, PutResponse.class, GetRequest.class, GetResponse.class)        .withStrategy(RpcStrategies.server(new InetSocketAddress("localhost", RPC_SERVER_PORT)));  }}

让我们也建立 ClientLauncher。 在 run() ,我们将考虑命令行参数并向 RpcServer提出适当的请求。

public class ClientLauncher extends Launcher {  private static final int TIMEOUT = 1000;
  @Inject  private RpcClient client;
  @Inject  Eventloop eventloop;
  @Override  protected Module getModule() {    return combine(        ServiceGraphModule.create(),        new ClientModule());  }
  @Override  protected void run() throws Exception {    if (args.length < 2) {      System.err.println("Command line args:\n\t--put key value\n\t--get key");      return;    }
    switch (args[0]) {      case "--put":        CompletableFuture<PutResponse> future1 = eventloop.submit(() ->            client.sendRequest(new PutRequest(args[1], args[2]), TIMEOUT)        );        PutResponse putResponse = future1.get();        System.out.println("PutResponse: " + putResponse);        break;      case "--get":        CompletableFuture<GetResponse> future2 = eventloop.submit(() ->            client.sendRequest(new GetRequest(args[1]), TIMEOUT)        );        GetResponse getResponse = future2.get();        System.out.println("GetResponse: " + getResponse);        break;      default:        throw new RuntimeException("Unsupported option: " + args[0]);    }  }
  public static void main(String[] args) throws Exception {    ClientLauncher launcher = new ClientLauncher();    launcher.launch(args);  }}

祝贺你! 我们已经完成了这个应用程序的代码编写。

测试#

首先,启动服务器。 打开 ServerLauncher 类,运行其 main() 方法。 然后做一个 PUT 请求。 打开 ClientLauncher 类,该类位于 activej/examples/tutorials/rpc-kv-storage 并将程序参数设置为 --put key1 value1。 对于IntelliJ IDEA: 运行 -> 编辑配置 -> |运行/调试配置 -> |程序参数 -> --put key1 value1||。 然后运行发射器的 main() 方法。 你将看到以下输出。

PutResponse:{previousValue='null'}

最后,做一个 GET 请求。

再次打开 ClientLauncher 类,并将程序参数设置为 --get key1。 对于IntelliJ IDEA: 运行 -> 编辑配置 -> |运行/调试配置 -> |程序参数 -> --get key1||。 然后运行 main() 方法的 client launcher。

你将看到以下输出。

GetResponse:{value='value1'}

恭喜你,你刚刚用RPC通信协议创建了一个远程键值存储。