跳到主要内容

基本实例

note

要运行例子,你需要从GitHub克隆ActiveJ

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查看标签 v5.0-beta2。 在运行这些例子之前,先建立项目。 简单的RPC实例 ,位于 activej/examples/cloud/rpc。

RPC策略的例子位于 activej/cloud-rpc/src/test/RpcStrategiesTest。

简单的RPC实例#

在 "Hello World "客户端和服务器 RPC实例,客户端发送了一个包含单词 "World " 的请求给服务器。 当 服务器收到它时,它将发送一个包含单词 "Hello "的响应。 如果一切顺利完成,我们会得到 以下输出。

得到的结果。你好,世界

让我们看一下实施情况。

public class RpcExample extends Launcher {  private static final int SERVICE_PORT = 34765;
  @Inject  private RpcClient client;
  @Inject  private RpcServer server;
  @Inject  private Eventloop eventloop;
  @Provides  Eventloop eventloop() {    return Eventloop.create();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop) {    return RpcServer.create(eventloop)        .withMessageTypes(String.class)        .withHandler(String.class,            request -> Promise.of("Hello " + request))        .withListenPort(SERVICE_PORT);  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withMessageTypes(String.class)        .withStrategy(server(new InetSocketAddress(SERVICE_PORT)));  }
  @ProvidesIntoSet  Initializer<ServiceGraphModuleSettings> configureServiceGraph() {    // add logical dependency so that service graph starts client only after it started the server    return settings -> settings.addDependency(Key.of(RpcClient.class), Key.of(RpcServer.class));  }
  @Override  protected Module getModule() {    return ServiceGraphModule.create();  }
  @Override  protected void run() throws ExecutionException, InterruptedException {    CompletableFuture<Object> future = eventloop.submit(() ->        client.sendRequest("World", 1000)    );    System.out.printf("%nRPC result: %s %n%n", future.get());  }
  public static void main(String[] args) throws Exception {    RpcExample example = new RpcExample();    example.launch(args);  }}

RpcExample class extends ActiveJ Launcher to help us manage application lifecycle. 接下来,我们使用依赖注入库 ActiveJ Inject 来提供 RpcServer呼叫中心 具备相关的配置和所需的依赖性。 RpcClient 根据所提供的信息,向指定的服务器发送带有字符串信息的请求。 RPC战略 (得到一个单一的RPC服务)。 对于 RpcServer ,我们定义了要进行的消息的类型,一个相应的 呼叫中心(RpcRequestHandler 和一个监听器端口。 由于我们扩展了 Launcher,我们还将覆盖2个方法: getModule ,以提供 ServiceGraphModulerun 来描述这个例子的主要逻辑。 最后,我们定义 main 方法,它将启动我们的例子。 你可以在以下网站上找到实例来源 GitHub

圆环战略#

ActiveJ RPC包含预定义的策略,用于RPC服务器或服务器分片之间的请求安排。 圆滚滚 是最简单的策略之一:它只是以循环的方式一个接一个地穿过服务器或碎片。 在这个例子中,我们创建了一个RPC ,有5个相等的 连接 ,并为它们设置了Round-Robin策略。 接下来,我们用之前定义的策略为水池创建一个发送器。 就是这样,100个请求将在服务器之间平均分配。

public void roundRobinTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int iterations = 100;  RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, ignore());  }
  List<RpcSenderStub> connections =      asList(connection1, connection2, connection3, connection4, connection5);  for (int i = 0; i < 5; i++) {    assertEquals(iterations / 5, connections.get(i).getRequests());  }}

你可以在以下网站上找到实例来源 GitHub

圆周率和先发制人战略相结合#

你可以简单地结合RPC策略。 在这个例子中,我们将结合 循环赛首次提供 战略。 首先,我们创建4个连接,而不把 connection3 到池中。 然后我们开始发送20个请求。 因此,所有的请求将被平均分配到 connection1 (因为它总是 第一个可用的)和 connection4 (因为 connection3 不在池子里)。

public void roundRobinTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int iterations = 100;  RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, ignore());  }
  List<RpcSenderStub> connections =      asList(connection1, connection2, connection3, connection4, connection5);  for (int i = 0; i < 5; i++) {    assertEquals(iterations / 5, connections.get(i).getRequests());  }}

你可以在以下网站上找到实例来源 GitHub

分片管理和首次有效策略的结合#

你也可以创建自己的分片函数,并在需要时将其与其他策略相结合。 在这个例子中,我们创建了5个相等的连接,但没有把 connection2 到池中。 接下来,我们提供一个简单的分片功能,根据请求的内容在分片之间分配请求。 我们把连接分成两个碎片,并为这两个碎片设置 First Valid Result 策略。 这种策略向所有可用的服务器发送请求。 现在,我们手动发送7个请求: 4,其中 0 消息,所以它们将被发送到第一个分片的 connection1* * 3,其中 1,所以它们都将被发送到第二个分片的所有三个连接上

public void roundRobinTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int iterations = 100;  RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, ignore());  }
  List<RpcSenderStub> connections =      asList(connection1, connection2, connection3, connection4, connection5);  for (int i = 0; i < 5; i++) {    assertEquals(iterations / 5, connections.get(i).getRequests());  }}

你可以在以下网站上找到实例来源 GitHub

会合哈希策略#

会合散列策略预先计算了以下的散列函数 RpcSender 并创建一个RPC服务器的地图。 地图存储在缓存中,只有在服务器上线/下线时才会重新计算。 在这个例子中,请求将在 connection1, connection2, 和 connection3之间平均分配。

public void roundRobinTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int iterations = 100;  RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, ignore());  }
  List<RpcSenderStub> connections =      asList(connection1, connection2, connection3, connection4, connection5);  for (int i = 0; i < 5; i++) {    assertEquals(iterations / 5, connections.get(i).getRequests());  }}

当我们从池中删除一些连接时,哈希函数会被重新计算。

  pool.remove(address3);  pool.remove(address4);  sender = strategy.createSender(pool);  for (int i = 0; i < iterationsPerLoop; i++) {    sender.sendRequest(i, 50, ignore());  }
  double acceptableError = iterationsPerLoop / 10.0;  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection1.getRequests(), acceptableError);  assertEquals(0, connection2.getRequests());  assertEquals(iterationsPerLoop / 3.0, connection3.getRequests(), acceptableError);  assertEquals(0, connection4.getRequests());  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection5.getRequests(), acceptableError);}

你可以在以下网站上找到实例来源 GitHub

类型 调度策略#

这种策略只是根据请求的类型在分片之间分配请求。 在例子中 ,所有 字符串 的请求都是在第一个分片上发送的,该分片对服务器有 第一个有效结果 策略。 所有其他类型的请求 ,以 First Available 策略发送到第二个分片。 因此, connection1connection2 将处理35个请求, connection3 - 25个请求,而 connection4connection5 - 0个请求 ,因为 connection3 总是 First Available

  pool.remove(address3);  pool.remove(address4);  sender = strategy.createSender(pool);  for (int i = 0; i < iterationsPerLoop; i++) {    sender.sendRequest(i, 50, ignore());  }
  double acceptableError = iterationsPerLoop / 10.0;  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection1.getRequests(), acceptableError);  assertEquals(0, connection2.getRequests());  assertEquals(iterationsPerLoop / 3.0, connection3.getRequests(), acceptableError);  assertEquals(0, connection4.getRequests());  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection5.getRequests(), acceptableError);}

你可以在以下网站上找到实例来源 GitHub