跳到主要内容

类似记忆库的应用

在本指南中,我们将创建一个类似于memcached的客户-服务器应用程序,它是基于RPC通信协议 和ActiveJ技术。

你可以在 GitHub,找到完整的例子来源。

Memcached客户端和服务器模块#

首先,考虑这些模块的初始ActiveJ RPC实现,因为我们的应用程序将在 它们的帮助下建立。

note

这个实现只包括基本的用法。 你可以根据你的应用需要增加更多的功能。

创建客户端和服务器#

让我们来写我们自己的 MemcacheLikeServer 服务器。 我们还将使用 ActiveJ Launcher 来管理应用程序的生命周期和 闪电般的依赖注入库 ActiveJ Inject

public class MemcacheLikeServer extends Launcher {  @Inject  WorkerPool.Instances<RpcServer> instances;
  @Provides  WorkerPool workerPool(WorkerPools workerPools) {    return workerPools.createPool(3);  }
  @Provides  Config config() {    return Config.create()        .with("memcache.buffers", "4")        .with("memcache.bufferCapacity", "64mb");  }
  @Override  protected Module getModule() {    return ModuleBuilder.create()        .install(ServiceGraphModule.create())        .install(MemcacheMultiServerModule.create())        .install(WorkerPoolModule.create())        .build();  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    MemcacheLikeServer server = new MemcacheLikeServer();    server.launch(args);  }}

由于我们扩展了 Launcher,我们需要覆盖2个方法: getModule ,以提供 ServiceGraphModulerun 来描述该例子的主要逻辑。

public class MemcacheLikeClient extends Launcher {  @Provides  Eventloop eventloop() {    return Eventloop.create();  }
  @Provides  RawMemcacheClientAdapter rawMemcacheClientAdapter(RawMemcacheClient client) {    return new RawMemcacheClientAdapter(client);  }
  @Provides  Config config() {    return Config.create()        .with("protocol.compression", "false")        .with("client.addresses", "localhost:9000, localhost:9001, localhost:9002");  }
  @Inject  RawMemcacheClientAdapter client;
  @Inject  Eventloop eventloop;
  @Override  protected Module getModule() {    return ModuleBuilder.create()        .install(ServiceGraphModule.create())        .install(MemcacheClientModule.create())        .install(ConfigModule.create()            .withEffectiveConfigLogger())        .build();  }
  @Override  protected void run() throws ExecutionException, InterruptedException {    String message = "Hello, Memcached Server";
    System.out.println();    CompletableFuture<Void> future = eventloop.submit(() ->        sequence(            () -> Promises.all(range(0, 25).mapToObj(i ->                client.put(i, message))),            () -> Promises.all(range(0, 25).mapToObj(i ->                client.get(i).whenResult(res -> System.out.println(i + " : " + res))))));    future.get();    System.out.println();  }
  public static void main(String[] args) throws Exception {    Launcher client = new MemcacheLikeClient();    client.launch(args);  }}
  • 由于 MemcacheClientModule 使用Rendezvous Hashing策略来实现 服务器分片之间的请求协议,在客户端我们需要提供这些分片的地址-- 9001, 9002, 和 9003 端口。
  • Eventloop ,我们要求将 ,在当前 <em x-id="3">i</em> 的 <em x-id="3">bytes[i]</em> 数组中放入一条信息, , <code>,从相应的单元中得到
  • 因此,客户端将对三个碎片异步执行这些操作,因此我们将收到一个无序的输出块作为结果。
  • RawMemcacheClientAdapter 是一个手动创建的类。 它使用 RawMemcacheClient (一个 MemcacheClient 实现),并为我们的 MemcacheLikeClient定义了 putget 方法的逻辑。

运行应用程序#

  • 首先,打开并运行 MemcacheLikeServer ,启动服务器。
  • 要启动客户端,运行 main() MemcacheLikeClient的方法。 你会在服务器的控制台看到大致相同的输出(... 省略了类似的信息)。
服务器在端口#9000上接受消息!服务器在端口#9000上接受消息!...服务器在端口#9000上接受消息!服务器在端口#9002接受消息!服务器在端口#9002接受消息!...服务器在端口#9002接受消息!服务器在#9001端口接受消息!...服务器在#9001端口接受消息!

在客户端控制台,你会看到一个类似的输出,确认客户端从服务器收到了异步响应。

0 : Hello, Memcached Server3 : Hello, Memcached Server4 : Hello, Memcached Server...11 : Hello, Memcached Server13 : Hello, Memcached Server...20 : Hello, Memcached Server21 : Hello, Memcached Server24 : Hello, Memcached Server...17 : 你好,Memcached服务器19 : 你好,Memcached服务器