Перейти к основному содержанию

Основные примеры

note

Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите тег v5.0-beta2. Перед запуском примеров выполните сборку проекта. Простые примеры RPC расположены по адресу activej/examples/cloud/rpc.

Примеры стратегий RPC расположены по адресу activej/cloud-rpc/src/test/RpcStrategiesTest.

Простой пример RPC#

В примере RPC "Hello World" клиента и сервера ****, клиент посылает серверу запрос, содержащий слово "World" . Когда сервер получает его, он отправляет ответ, содержащий слово "Hello ". Если все завершится успешно, мы получим следующий вывод :

Получен результат: Hello World

Давайте посмотрим на реализацию:

public class RpcExample extends Launcher {  private static final int SERVICE_PORT = 34765;
  @Inject  private RpcClient client;
  @Inject  private RpcServer server;
  @Inject  private Eventloop eventloop;
  @Provides  Eventloop eventloop() {    return Eventloop.create();  }
  @Provides  RpcServer rpcServer(Eventloop eventloop) {    return RpcServer.create(eventloop)        .withMessageTypes(String.class)        .withHandler(String.class,            request -> Promise.of("Hello " + request))        .withListenPort(SERVICE_PORT);  }
  @Provides  RpcClient rpcClient(Eventloop eventloop) {    return RpcClient.create(eventloop)        .withMessageTypes(String.class)        .withStrategy(server(new InetSocketAddress(SERVICE_PORT)));  }
  @ProvidesIntoSet  Initializer<ServiceGraphModuleSettings> configureServiceGraph() {    // add logical dependency so that service graph starts client only after it started the server    return settings -> settings.addDependency(Key.of(RpcClient.class), Key.of(RpcServer.class));  }
  @Override  protected Module getModule() {    return ServiceGraphModule.create();  }
  @Override  protected void run() throws ExecutionException, InterruptedException {    CompletableFuture<Object> future = eventloop.submit(() ->        client.sendRequest("World", 1000)    );    System.out.printf("%nRPC result: %s %n%n", future.get());  }
  public static void main(String[] args) throws Exception {    RpcExample example = new RpcExample();    example.launch(args);  }}

Класс RpcExample расширяет ActiveJ Launcher , чтобы помочь нам управлять жизненным циклом приложения . Далее мы используем библиотеку Dependency Injection ActiveJ Inject , чтобы обеспечить RpcServer и RpcClient с соответствующими конфигурациями и необходимыми зависимостями. RpcClient отправляет запросы с сообщением String на указанный сервер в соответствии с предоставленными параметрами. Стратегия RPC (получение одного RPC-сервиса). Для RpcServer мы определяем тип сообщений, которые будут поступать, соответствующий RpcRequestHandler и порт слушателя. Поскольку мы расширяем Launcher, мы также переопределим 2 метода: getModule для предоставления ServiceGraphModule и run для описания основной логики примера. Наконец, мы определим метод main , который запустит наш пример. Примеры источников можно найти на GitHub

Стратегия "круглого ромба#

ActiveJ RPC содержит предопределенные стратегии для организации запросов между RPC-серверами или осколками серверов. Round-Robin является одной из самых простых стратегий: она просто циклически перебирает серверы или осколки один за другим. В этом примере мы создаем пул RPC * с 5 одинаковыми соединениями* и устанавливаем для них стратегию Round-Robin. Далее мы создаем отправителя для пула с помощью ранее определенной стратегии. Вот и все, 100 запросов будут равномерно распределены между серверами:

public void roundRobinTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int iterations = 100;  RpcStrategy strategy = roundRobin(servers(address1, address2, address3, address4, address5));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, ignore());  }
  List<RpcSenderStub> connections =      asList(connection1, connection2, connection3, connection4, connection5);  for (int i = 0; i < 5; i++) {    assertEquals(iterations / 5, connections.get(i).getRequests());  }}

Примеры источников можно найти на GitHub

Комбинированные стратегии "круглый робин" и "первый доступный#

Вы можете просто комбинировать стратегии RPC. В этом примере мы объединим Раунд Робин и Первый доступный стратегии. Сначала мы создаем 4 соединения, не помещая connection3 в пул. Затем мы начинаем отправлять 20 запросов. В результате все запросы будут поровну распределены между соединением1 (так как оно всегда первым доступно) и соединением4 (так как соединение3 не доступно для пула):

public void roundRobinAndFirstAvailableTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  // we don't put connection3  pool.put(address4, connection4);  int iterations = 20;  RpcStrategy strategy = roundRobin(      firstAvailable(servers(address1, address2)),      firstAvailable(servers(address3, address4)));
  RpcSender sender = strategy.createSender(pool);  for (int i = 0; i < iterations; i++) {    sender.sendRequest(new Object(), 50, assertNoCalls());  }
  assertEquals(iterations / 2, connection1.getRequests());  assertEquals(0, connection2.getRequests());  assertEquals(0, connection3.getRequests());  assertEquals(iterations / 2, connection4.getRequests());}

Примеры источников можно найти на GitHub

Объединение стратегий шардинга и первой валидной стратегии#

Вы также можете создавать свои собственные функции шардинга и при необходимости комбинировать их с другими стратегиями. В этом примере мы создаем 5 одинаковых соединений, но не помещаем connection2 в пул. Далее мы предоставляем простую функцию шардинга, которая распределяет запросы между шардами в соответствии с содержанием запроса. Мы разделили соединения на два осколка и установили стратегию First Valid Result для обоих из них. Эта стратегия посылает запрос на все доступные серверы. Теперь мы вручную отправляем 7 запросов: 4 с сообщением 0 , поэтому они будут отправлены на соединение первого шарда1* * 3 с сообщением 1, поэтому все они будут отправлены на все три соединения второго шарда.

public void shardingAndFirstValidTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  // we don't put connection2  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int shardsCount = 2;  ShardingFunction<Integer> shardingFunction = item -> item % shardsCount;  RpcStrategy strategy = sharding(shardingFunction,      firstValidResult(servers(address1, address2)),      firstValidResult(servers(address3, address4, address5)));
  RpcSender sender = strategy.createSender(pool);  sender.sendRequest(0, 50, assertNoCalls());  sender.sendRequest(0, 50, assertNoCalls());  sender.sendRequest(1, 50, assertNoCalls());  sender.sendRequest(1, 50, assertNoCalls());  sender.sendRequest(0, 50, assertNoCalls());  sender.sendRequest(0, 50, assertNoCalls());  sender.sendRequest(1, 50, assertNoCalls());
  assertEquals(4, connection1.getRequests());  assertEquals(0, connection2.getRequests());  assertEquals(3, connection3.getRequests());  assertEquals(3, connection4.getRequests());  assertEquals(3, connection5.getRequests());}

Примеры источников можно найти на GitHub

Стратегия рандеву хэширования#

Стратегия хэширования Rendezvous предварительно вычисляет хэш-функцию для RpcSender и создает карту серверов RPC. Карта хранится в кэше и будет пересчитана только при переходе серверов в онлайн/офлайн режим. В данном примере запросы будут равномерно распределены между соединение1, соединение2, и соединение3:

public void rendezvousHashingTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  HashFunction<Integer> hashFunction = item -> item;  RpcStrategy strategy = rendezvousHashing(hashFunction)      .withShard(1, firstAvailable(servers(address1, address2)))      .withShard(2, firstAvailable(servers(address3, address4)))      .withShard(3, server(address5));  int iterationsPerLoop = 1000;  RpcSender sender;
  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  sender = strategy.createSender(pool);  for (int i = 0; i < iterationsPerLoop; i++) {    sender.sendRequest(i, 50, ignore());  }

Когда мы удаляем некоторые соединения из пула, хэш-функция пересчитывается:

  pool.remove(address3);  pool.remove(address4);  sender = strategy.createSender(pool);  for (int i = 0; i < iterationsPerLoop; i++) {    sender.sendRequest(i, 50, ignore());  }
  double acceptableError = iterationsPerLoop / 10.0;  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection1.getRequests(), acceptableError);  assertEquals(0, connection2.getRequests());  assertEquals(iterationsPerLoop / 3.0, connection3.getRequests(), acceptableError);  assertEquals(0, connection4.getRequests());  assertEquals(iterationsPerLoop / 3.0 + iterationsPerLoop / 2.0, connection5.getRequests(), acceptableError);}

Примеры источников можно найти на GitHub

Тип Диспетчерская стратегия#

Эта стратегия просто распределяет запросы между шардами в соответствии с типом запроса. В примере все запросы String отправляются на первый шард, который имеет стратегию First Valid Result для серверов. Запросы со всеми остальными типами отправляются на второй шард со стратегией First Available . В результате соединение1 и соединение2 обработают 35 запросов, соединение3 - 25 запросов, а соединение4 и соединение5 - 0 запросов , так как соединение3 всегда было First Available:

public void typeDispatchTest() {  RpcClientConnectionPoolStub pool = new RpcClientConnectionPoolStub();  RpcSenderStub connection1 = new RpcSenderStub();  RpcSenderStub connection2 = new RpcSenderStub();  RpcSenderStub connection3 = new RpcSenderStub();  RpcSenderStub connection4 = new RpcSenderStub();  RpcSenderStub connection5 = new RpcSenderStub();  pool.put(address1, connection1);  pool.put(address2, connection2);  pool.put(address3, connection3);  pool.put(address4, connection4);  pool.put(address5, connection5);  int timeout = 50;  int iterationsPerDataStub = 25;  int iterationsPerDataStubWithKey = 35;  RpcSender sender;  RpcStrategy strategy = typeDispatching()      .on(String.class,          firstValidResult(servers(address1, address2)))      .onDefault(          firstAvailable(servers(address3, address4, address5)));
  sender = strategy.createSender(pool);  for (int i = 0; i < iterationsPerDataStub; i++) {    sender.sendRequest(new Object(), timeout, assertNoCalls());  }  for (int i = 0; i < iterationsPerDataStubWithKey; i++) {    sender.sendRequest("request", timeout, assertNoCalls());  }
  assertEquals(iterationsPerDataStubWithKey, connection1.getRequests());  assertEquals(iterationsPerDataStubWithKey, connection2.getRequests());  assertEquals(iterationsPerDataStub, connection3.getRequests());  assertEquals(0, connection4.getRequests());  assertEquals(0, connection5.getRequests());}

Примеры источников можно найти на GitHub