Skip to main content

Workers

Overview#

Workers (WorkerPoolModule in particular) is a handy way of injecting multiple dependencies of the same type. It is useful when you need, for example, same set of dependencies for each thread. ActiveJ heavily utilizes workers to implement multithreading. Workers are the base of ActiveJ threading model.

ActiveJ threading model#

The primary mission of ActiveJ is to create ultimately fast, scalable, simple to use, and high-abstraction level I/O async programming model. To achieve this, ActiveJ design principles overcome all the performance overhead and complexities of the traditional multithreaded programming model, yet fully utilize Java multithreading capabilities. ActiveJ offers means of splitting the application into a Primary Eventloop thread and Worker Eventloop threads. These threads communicate with each other via message passing and thread-safe application-specific singleton services.

Eventloop thread is essentially a single-threaded mini-application (similar to Node.js), which handles its part of I/O tasks and executes Runnables submitted from other threads. Primary Eventloop threads distribute and balance I/O tasks between Worker threads.

graph TB id1(Primary Event Loop) --> id2(Worker Balancer) id2 --> id4 subgraph Worker Scope id3(Event Loop) --> id4(HTTP Server) end id2 --> id6 subgraph Worker Scope id5(Event Loop) --> id6(HTTP Server) end

The benefits of ActiveJ threading model:

  • Each primary/worker Eventloop thread works as a single-threaded application, which is simple to program and to reason about
  • There is no multithreaded overhead, races, and thread synchronization overhead
  • Traditional strength of Java in multithreaded programming is fully preserved:
  • typical I/O load can be easily split between worker threads
  • the application can have thread-safe singleton services, which are used by Eventloop threads, and a huge singleton data state, shared among all worker threads
  • you can still use some thread synchronization / lock-free algorithms, just try to avoid excessive blocking of concurrent threads
  • full interoperability between Java Threads, Thread Pools, Java Futures, and even blocking I/O operations

Worker Scope#

Problem#

This design raises some implementation questions. For example, if we want to implement multithreaded HTTP web application with worker eventloops:

  • according to these design principles, we need to create separate instances of a working eventloop, a single-threaded HTTP server, and its servlets for each working thread
  • but what if our application has 8 eventloop threads with 10 worker-thread components inside, do we have to create 80 of components in total and assign them to each worker thread?
  • how is it even possible to manually instantiate, wire, initialize, and start/stop all those components in a correct order and also gracefully shutdown application on start/stop errors?

Solution#

Luckily, due to ActiveJ Inject, we have a solution - @Worker scope. If you need to implement several worker threads:

  • include WorkerPoolModule module and create a WorkerPool instance
  • annotate the components you wish to put into each worker thread with @Worker scope annotation

WorkerPool will automatically instantiate identical dependency graphs for each of those worker threads You are by no means limited to the aforementioned scheme with one primary Eventloop and N worker eventloops:

  • you can still have completely unrelated / standalone eventloops (nor primary, neither worker)
  • several primary eventloops sharing the same pool of worker eventloops or several sets of worker pools with different number or threads
  • you can even define your own @Worker annotations, and create multiple worker pools with completely unrelated and different dependency graphs All this is in fully transparent and easy-to-understand modules - just mark different components with appropriate worker annotations and let WorkerPool create all the instances

To automatically start/stop application components in correct order, simply include ServiceGraph module into your Launcher - it is aware of worker pools and will treat vectors of worker instances as special compound singleton-like instances.

For example, here is an example of utilizing MultithreadedHttpServerLauncher which features ServiceGraphModule:

public final class MultithreadedHttpServerExample extends MultithreadedHttpServerLauncher {
@Provides
@Worker
AsyncServlet servlet(@WorkerId int workerId) {
return request -> HttpResponse.ok200()
.withPlainText("Hello from worker server #" + workerId + "\n");
}
public static void main(String[] args) throws Exception {
MultithreadedHttpServerExample example = new MultithreadedHttpServerExample();
example.launch(args);
}
}

And its dependency graph looks as follows:

graph subgraph "@Worker()" id1(AsyncHTTPServer) --> id2(EventLoop) id1 --> id3(Async Servlet) id3 --> id4("@WorkerId() int") end id2 -.-> ThrottlingController id2 --> id5(Config) id1 --> id5 id9 --> id5 id6(EventLoop) --> id5 id7(Primary Server) --> id5 id7 --> id6 id7 --> id8(WorkerPool$Intsnces) id8 --> id9(WorkerPool) id9 --> id10(WorkerPools) id10 --> Injector

To help you understand how worker pools work, here is a simplified WorkerPool implementation in a nutshell (the actual implementation differs, but not much):

public final class WorkerPool {
private final Scope scope;
private final Injector[] scopeInjectors;
WorkerPool(Injector injector, Scope scope, int workers) {
this.scope = scope;
this.scopeInjectors = new Injector[workers];
for (int i = 0; i < workers; i++) {
scopeInjectors[i] = injector.enterScope(scope, new HashMap<>(), false);
}
}
public <T> Instances<T> getInstances(Key<T> key) {
Instances<T> instances = new Instances<>(new Object[scopeInjectors.length]);
for (int i = 0; i < scopeInjectors.length; i++) {
instances.instances[i] = scopeInjectors[i].getInstance(key);
}
return instances;
}
}

As you can see, the root Injector simply ‘enters’ the worker scope N times, so we have N Injectors with identical bindings/dependency graphs, but different containers of instances. Each time we need to create some worker instances, they are created N times by each injector and returned as a vector of N instances.

Examples#

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag

. Before running the examples, build the project. These examples are located at activej -> examples -> core -> boot

Basic Worker Pool Example#

An example of creating a worker pool with 4 workers:

public final class WorkerPoolModuleExample extends AbstractModule {
@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(4);
}
@Provides
@Worker
String string(@WorkerId int workerId) {
return "Hello from worker #" + workerId;
}
public static void main(String[] args) {
Injector injector = Injector.of(WorkerPoolModule.create(), new WorkerPoolModuleExample());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<String> strings = workerPool.getInstances(String.class);
strings.forEach(System.out::println);
}
}

The dependency graph of the example includes the created worker pool and looks as follows:

Dependency graphDependency graph

Multithreaded Worker Pools Collaboration#

Several Worker Pools can co-work to calculate a single task. In this example we have 25 Workers and each of them has its own Eventloop. These Eventloops are wrapped in Threads and then added to the list of threads. After that the list is permuted and the threads with Eventloop tasks start. The task is to put Eventloop id in the ConcurrentLinkedQueue in accordance to the delay (the id multiplied by 100). In this way we receive an ordered queue of Eventloop ids, after that the Threads park and the queue is emptied.

public final class MultithreadedWorkerCollab extends AbstractModule {
@Provides
@Worker
Eventloop eventloop(@WorkerId int wid, ConcurrentLinkedQueue<Integer> queue) {
Eventloop eventloop = Eventloop.create();
eventloop.delay(100L * wid, () -> queue.add(wid));
return eventloop;
}
@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(25);
}
@Provides
ConcurrentLinkedQueue<Integer> queue() {
return new ConcurrentLinkedQueue<>();
}
public static void main(String[] args) throws InterruptedException {
Injector injector = Injector.of(WorkerPoolModule.create(), new MultithreadedWorkerCollab());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<Eventloop> eventloops = workerPool.getInstances(Eventloop.class);
List<Thread> threads = new ArrayList<>();
for (Eventloop eventloop : eventloops.getList()) {
Thread thread = new Thread(eventloop);
threads.add(thread);
}
Collections.shuffle(threads);
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
ConcurrentLinkedQueue<Integer> queue = injector.getInstance(new Key<ConcurrentLinkedQueue<Integer>>() {});
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
}
}