Перейти к основному содержанию

Workers

Обзор

Рабочие (WorkerPoolModule в частности) является удобным способом инжектирования нескольких зависимостей одного типа. Это полезно, когда вам нужен, например, одинаковый набор зависимостей для каждого потока. ActiveJ в значительной степени использует рабочие для реализации многопоточности. Workers являются основой потоковой модели ActiveJ.

Потоковая модель ActiveJ

Основная миссия ActiveJ заключается в создании в конечном итоге быстрой, масштабируемой, простой в использовании и высокоабстрактной модели программирования I/O async . Для достижения этой цели принципы проектирования ActiveJ позволяют преодолеть все накладные расходы и сложности традиционной модели многопоточного программирования и при этом полностью использовать возможности многопоточности Java. ActiveJ предлагает средства разделения приложения на Primary Eventloop поток и Worker Eventloop потоки. Эти потоки взаимодействуют друг с другом посредством передачи сообщений и потокобезопасных сервисов singleton, специфичных для конкретного приложения.

Eventloop поток - это, по сути, однопоточное мини-приложение (похожее на Node.js), которое обрабатывает свою часть задач ввода-вывода и выполняет Runnables, переданные из других потоков. Первичные потоки Eventloop распределяют и балансируют задачи ввода-вывода между рабочими потоками.

graph TB id1(Primary Event Loop) --> id2(Worker Balancer) id2 --> id4 subgraph Worker Scope id3(Event Loop) --> id4(HTTP Server) end id2 --> id6 subgraph Worker Scope id5(Event Loop) --> id6(HTTP Server) end

Преимущества потоковой модели ActiveJ:

  • Каждый основной/рабочий поток Eventloop работает как однопоточное приложение, которое просто программировать и рассуждать о нем
  • Отсутствуют многопоточные накладные расходы, гонки и накладные расходы на синхронизацию потоков
  • Традиционная сила Java в многопоточном программировании полностью сохранена:
  • типичная нагрузка ввода-вывода может быть легко распределена между рабочими потоками
  • приложение может иметь потокобезопасные синглтонные сервисы, которые используются потоками Eventloop, и огромный синглтон состояния данных, разделяемый между всеми рабочими потоками
  • вы все еще можете использовать некоторые алгоритмы синхронизации потоков / алгоритмы без блокировки, просто старайтесь избегать чрезмерной блокировки одновременных потоков.
  • полная совместимость между Java Threads, Thread Pools, Java Futures и даже блокирующими операциями ввода/вывода

Worker Scope

Проблема

Такая конструкция вызывает некоторые вопросы по реализации. Например, если мы хотим реализовать многопоточное HTTP веб-приложение с рабочими циклами событий:

  • в соответствии с этими принципами проектирования, нам необходимо создать отдельные экземпляры рабочего цикла событий, однопоточного сервера HTTP и его сервлетов для каждого рабочего потока
  • но что если наше приложение имеет 8 потоков eventloop с 10 компонентами рабочих потоков внутри, должны ли мы создать 80 из компонентов в целом и назначить их каждому рабочему потоку?
  • как вообще возможно вручную инстанцировать, подключить, инициализировать и запустить/остановить все эти компоненты в правильном порядке, а также изящно завершить работу приложения при ошибках запуска/остановки?

Решение

К счастью, благодаря ActiveJ Injectу нас есть решение - @Worker scope. Если вам необходимо реализовать несколько рабочих потоков:

  • включить WorkerPoolModule модуль и создать WorkerPool экземпляр
  • аннотируйте компоненты, которые вы хотите поместить в каждый рабочий поток, с помощью аннотации @Worker scope

WorkerPool будет автоматически создавать идентичные графы зависимостей для каждого из этих рабочих потоков Вы ни в коем случае не ограничены вышеупомянутой схемой с одним основным Eventloop и N рабочими Eventloop:

  • вы все еще можете иметь совершенно несвязанные / автономные эвентлупы (ни основной, ни рабочий)
  • несколько основных событийных циклов совместно используют один и тот же пул рабочих событийных циклов или несколько наборов рабочих пулов с различным количеством или потоками
  • вы даже можете определить свои собственные @Worker аннотации, и создать несколько пулов рабочих с совершенно несвязанными и различными графами зависимостей Все это в полностью прозрачных и простых для понимания модулях - просто пометьте различные компоненты соответствующими аннотациями рабочих и позвольте WorkerPool создать все экземпляры.

Чтобы автоматически запускать/останавливать компоненты приложения в правильном порядке, просто включите модуль ServiceGraph в ваш Launcher - он знает о пулах рабочих и будет рассматривать векторы рабочих экземпляров как специальные составные синглтоноподобные экземпляры.

Например, вот пример использования MultithreadedHttpServerLauncher в котором есть ServiceGraphModule:

public final class MultithreadedHttpServerExample extends MultithreadedHttpServerLauncher {  @Provides  @Worker  AsyncServlet servlet(@WorkerId int workerId) {    return request -> HttpResponse.ok200()        .withPlainText("Hello from worker server #" + workerId + "\n");  }
  public static void main(String[] args) throws Exception {    MultithreadedHttpServerExample example = new MultithreadedHttpServerExample();    example.launch(args);  }}

Граф зависимостей выглядит следующим образом:

graph subgraph "@Worker()" id1(AsyncHTTPServer) --> id2(EventLoop) id1 --> id3(Async Servlet) id3 --> id4("@WorkerId() int") end id2 -.-> ThrottlingController id2 --> id5(Config) id1 --> id5 id9 --> id5 id6(EventLoop) --> id5 id7(Primary Server) --> id5 id7 --> id6 id7 --> id8(WorkerPool$Intsnces) id8 --> id9(WorkerPool) id9 --> id10(WorkerPools) id10 --> Injector

Чтобы помочь вам понять, как работают рабочие пулы, вот упрощенная реализация WorkerPool в двух словах (фактическая реализация отличается, но не сильно):

public final class WorkerPool {    private final Scope scope;    private final Injector[] scopeInjectors;
    WorkerPool(Injector injector, Scope scope, int workers) {        this.scope = scope;        this.scopeInjectors = new Injector[workers];        for (int i = 0; i < workers; i ) {            scopeInjectors[i] = injector.enterScope(scope, new HashMap<>(), false);        }    }
    public <T> Instances<T> getInstances(Key<T> key) {        Instances<T> instances = new Instances<>(new Object[scopeInjectors.length]);        for (int i = 0; i < scopeInjectors.length; i ) {            instances.instances[i] = scopeInjectors[i].getInstance(key);        }        return instances;    }}

Как вы можете видеть, корень Инжектор просто "входит" в рабочую область N раз, поэтому мы имеем N Инжекторs с идентичными связями/графами зависимостей, но разными контейнерами экземпляров. Каждый раз, когда нам нужно создать несколько рабочих экземпляров, они создаются N раз каждым инжектором и возвращаются в виде вектора N экземпляров.

Примеры

note

Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите тег v5.4.3. Перед запуском примеров выполните сборку проекта. Эти примеры расположены по адресу activej/examples/core/boot.

Пример базового пула работников

Пример создания пула работников с 4 работниками:

public final class WorkerPoolModuleExample extends AbstractModule {  @Provides  WorkerPool workerPool(WorkerPools workerPools) {    return workerPools.createPool(4);  }
  @Provides  @Worker  String string(@WorkerId int workerId) {    return "Hello from worker #" + workerId;  }
  public static void main(String[] args) {    Injector injector = Injector.of(WorkerPoolModule.create(), new WorkerPoolModuleExample());    WorkerPool workerPool = injector.getInstance(WorkerPool.class);    WorkerPool.Instances<String> strings = workerPool.getInstances(String.class);    strings.forEach(System.out::println);  }}

Граф зависимостей примера включает созданный пул рабочих и выглядит следующим образом:

Dependency graphDependency graph

Полный текст примера смотрите на GitHub.

Многопоточные рабочие пулы Совместная работа

Несколько пулов рабочих могут совместно работать над выполнением одной задачи. В этом примере у нас есть 25 рабочих, и у каждого из них есть свой Eventloop. Эти Eventloops оборачиваются в Threads и затем добавляются в список threads. После этого происходит перестановка списка и запускаются потоки с задачами Eventloop. Задача состоит в том, чтобы поместить Eventloop id в ConcurrentLinkedQueue в соответствии с задержкой ( id умноженное на 100). Таким образом, мы получаем упорядоченную очередь идентификаторов Eventloop, после чего паркует Threads и очередь опустошается.

public final class MultithreadedWorkerCollab extends AbstractModule {
  @Provides  @Worker  Eventloop eventloop(@WorkerId int wid, ConcurrentLinkedQueue<Integer> queue) {    Eventloop eventloop = Eventloop.create();    eventloop.delay(100L * wid, () -> queue.add(wid));    return eventloop;  }
  @Provides  WorkerPool workerPool(WorkerPools workerPools) {    return workerPools.createPool(25);  }
  @Provides  ConcurrentLinkedQueue<Integer> queue() {    return new ConcurrentLinkedQueue<>();  }
  public static void main(String[] args) throws InterruptedException {    Injector injector = Injector.of(WorkerPoolModule.create(), new MultithreadedWorkerCollab());    WorkerPool workerPool = injector.getInstance(WorkerPool.class);    WorkerPool.Instances<Eventloop> eventloops = workerPool.getInstances(Eventloop.class);
    List<Thread> threads = new ArrayList<>();    for (Eventloop eventloop : eventloops.getList()) {      Thread thread = new Thread(eventloop);      threads.add(thread);    }
    Collections.shuffle(threads);    threads.forEach(Thread::start);
    for (Thread thread : threads) {      thread.join();    }
    ConcurrentLinkedQueue<Integer> queue = injector.getInstance(new Key<ConcurrentLinkedQueue<Integer>>() {});
    while (!queue.isEmpty()) {      System.out.println(queue.poll());    }
  }}

Полный текст примера смотрите на GitHub.