Skip to main content

Workers

Overview

Workers (particularly WorkerPoolModule) is a convenient way to inject multiple dependencies of the same type. It is useful when you need, for example, the same set of dependencies for each thread. ActiveJ relies heavily on workers to implement multithreading. Workers are the core of ActiveJ's threading model.

ActiveJ's threading model

The primary goal of ActiveJ is to create a fast, scalable, easy-to-use, and high-abstraction level I/O async programming model. To achieve this goal, ActiveJ's design principles overcome the performance overhead and complexities of the traditional multithreaded programming model, but still take full advantage of Java's multithreading capabilities. ActiveJ offers means of splitting the application into a Primary Reactor thread and Worker Reactor threads. These threads communicate with each other via message passing and thread-safe application-specific singleton services.

An implementation of a Reactor in ActiveJ is an Eventloop. Eventloop thread is essentially a single-threaded mini-application (similar to Node.js) that handles its share of I/O tasks and executes Runnables submitted from other threads. Primary Reactor threads distribute and balance I/O tasks between Worker threads.

graph TB id1(Primary Reactor) --> id2(Worker Balancer) id2 --> id4 subgraph Worker Scope id3(Reactor) --> id4(HTTP Server) end id2 --> id6 subgraph Worker Scope id5(Reactor) --> id6(HTTP Server) end

The benefits of ActiveJ threading model:

  • Each primary/worker reactor thread works as a single-threaded application, which is easy to program and reason about
  • No multithreaded overhead, races, and thread synchronization overhead
  • The traditional power of Java in multithreaded programming is fully preserved:
  • A typical I/O load can easily be split between worker threads
  • The application can have thread-safe singleton services used by reactor threads, and a huge singleton data state shared between all worker threads
  • You can still use some thread synchronization / lock-free algorithms, just try to avoid excessive blocking of concurrent threads
  • Full compatibility with Java Threads, Thread Pools, Java Futures, and even blocking I/O operations

Worker Scope

Problem

Such a design raises some implementation questions. For example, if we want to implement a multithreaded HTTP web application with worker reactors:

  • according to these design principles, we need to create separate instances of a worker reactor, a single-threaded HTTP server, and its servlets for each worker thread
  • but what if our application has 8 reactor threads with 10 worker-thread components inside, do we have to create 80 of components in total and assign them to each worker thread?
  • how is it even possible to manually instantiate, wire, initialize, and start/stop all these components in the right order, and also gracefully shutdown application on start/stop errors?

Solution

Fortunately, thanks to ActiveJ Inject, we have a solution: the @Worker scope. If you need to implement multiple worker threads:

  • include WorkerPoolModule module and create a WorkerPool instance
  • annotate the components you want to put in each worker thread with the @Worker scope annotation

The WorkerPool will automatically instantiate identical dependency graphs for each of these worker threads You are by no means limited to the above scheme with one primary reactor and N worker reactors:

  • you can still have completely unrelated / standalone reactors (nor primary, neither worker)
  • several primary reactors sharing the same pool of worker reactors or several sets of worker pools with different number or threads
  • you can even define your own @Worker annotations, and create multiple worker pools with completely unrelated and different dependency graphs All this is in fully transparent and easy-to-understand modules - just mark different components with the appropriate worker annotations and let the WorkerPool create all instances

To automatically start/stop application components in the correct order, simply include ServiceGraph module into your Launcher - it is aware of worker pools and will treat worker instances as special compound singleton-like instances.

For example, here is an example of utilizing MultithreadedHttpServerLauncher which features ServiceGraphModule:

public final class MultithreadedHttpServerExample extends MultithreadedHttpServerLauncher {
@Provides
@Worker
AsyncServlet servlet(@WorkerId int workerId) {
return request -> HttpResponse.ok200()
.withPlainText("Hello from worker server #" + workerId + "\n")
.toPromise();
}

public static void main(String[] args) throws Exception {
MultithreadedHttpServerExample example = new MultithreadedHttpServerExample();
example.launch(args);
}
}

And its dependency graph looks as follows:

graph subgraph "@Worker()" id1(HTTPServer) --> id2(Reactor) id1 --> id3(Async Servlet) id3 --> id4("@WorkerId() int") end id2 -.-> ThrottlingController id2 --> id5(Config) id1 --> id5 id9 --> id5 id6(Reactor) --> id5 id7(Primary Server) --> id5 id7 --> id6 id7 --> id8(WorkerPool$Instances) id8 --> id9(WorkerPool) id9 --> id10(WorkerPools) id10 --> Injector

To help you understand how worker pools work, here is a simplified WorkerPool implementation in a nutshell (the actual implementation differs, but not much):

public final class WorkerPool {
private final Scope scope;
private final Injector[] scopeInjectors;

WorkerPool(Injector injector, Scope scope, int workers) {
this.scope = scope;
this.scopeInjectors = new Injector[workers];
for (int i = 0; i < workers; i++) {
scopeInjectors[i] = injector.enterScope(scope, new HashMap<>(), false);
}
}

public <T> Instances<T> getInstances(Key<T> key) {
Instances<T> instances = new Instances<>(new Object[scopeInjectors.length]);
for (int i = 0; i < scopeInjectors.length; i++) {
instances.instances[i] = scopeInjectors[i].getInstance(key);
}
return instances;
}
}

As you can see, the root Injector simply ‘enters’ the worker scope N times, so we have N Injectors with identical bindings/dependency graphs, but different containers of instances. Each time we need to create some worker instances, they are created N times by each injector and returned as a vector of N instances.

Examples

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. These examples are located at activej/examples/core/boot

Basic Worker Pool Example

An example of creating a worker pool with 4 workers:

public final class WorkerPoolModuleExample extends AbstractModule {
@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(4);
}

@Provides
@Worker
String string(@WorkerId int workerId) {
return "Hello from worker #" + workerId;
}

public static void main(String[] args) {
Injector injector = Injector.of(WorkerPoolModule.create(), new WorkerPoolModuleExample());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<String> strings = workerPool.getInstances(String.class);
strings.forEach(System.out::println);
}
}

The dependency graph of the example includes the created worker pool and looks as follows:

Dependency graphDependency graph

See full example on GitHub

Multithreaded Worker Pools Collaboration

Several Worker Pools can work together on a same task. In this example, we have 25 Workers, and each of them has its own Eventloop. These Eventloops are wrapped in Threads and then added to the list of threads. The list is then shuffled and threads with Eventloop tasks start. The task is to put the Eventloop id in the ConcurrentLinkedQueue in accordance to the delay (id multiplied by 100). In this way, we get an ordered queue of Eventloop ids, after that the Threads are parked and the queue is emptied.

public final class MultithreadedWorkerCollab extends AbstractModule {

@Provides
@Worker
Eventloop eventloop(@WorkerId int wid, ConcurrentLinkedQueue<Integer> queue) {
Eventloop eventloop = Eventloop.create();
eventloop.delay(100L * wid, () -> queue.add(wid));
return eventloop;
}

@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(25);
}

@Provides
ConcurrentLinkedQueue<Integer> queue() {
return new ConcurrentLinkedQueue<>();
}

public static void main(String[] args) throws InterruptedException {
Injector injector = Injector.of(WorkerPoolModule.create(), new MultithreadedWorkerCollab());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<Eventloop> eventloops = workerPool.getInstances(Eventloop.class);

List<Thread> threads = new ArrayList<>();
for (Eventloop eventloop : eventloops.getList()) {
Thread thread = new Thread(eventloop);
threads.add(thread);
}

Collections.shuffle(threads);
threads.forEach(Thread::start);

for (Thread thread : threads) {
thread.join();
}

ConcurrentLinkedQueue<Integer> queue = injector.getInstance(new Key<>() {});

while (!queue.isEmpty()) {
System.out.println(queue.poll());
}

}
}

See full example on GitHub