Workers
Overview
Workers (particularly WorkerPoolModule
) is a convenient way to
inject multiple dependencies of the same type. It is useful when you need, for example, the same set of dependencies for each thread. ActiveJ relies heavily on workers
to implement multithreading. Workers are the core of ActiveJ's threading model.
ActiveJ's threading model
The primary goal of ActiveJ is to create a fast, scalable, easy-to-use, and high-abstraction level I/O async
programming model.
To achieve this goal, ActiveJ's design principles overcome the performance overhead and complexities of the traditional
multithreaded programming model, but still take full advantage of Java's multithreading capabilities.
ActiveJ offers means of splitting the application into a Primary Reactor
thread and Worker Reactor
threads. These threads communicate with each other via message passing and thread-safe application-specific
singleton services.
An implementation of a Reactor
in ActiveJ is an Eventloop
. Eventloop
thread is essentially a single-threaded mini-application
(similar to Node.js) that handles its share of I/O tasks and executes Runnables submitted from other threads.
Primary Reactor threads distribute and balance I/O tasks between Worker threads.
The benefits of ActiveJ threading model:
- Each primary/worker reactor thread works as a single-threaded application, which is easy to program and reason about
- No multithreaded overhead, races, and thread synchronization overhead
- The traditional power of Java in multithreaded programming is fully preserved:
- A typical I/O load can easily be split between worker threads
- The application can have thread-safe singleton services used by reactor threads, and a huge singleton data state shared between all worker threads
- You can still use some thread synchronization / lock-free algorithms, just try to avoid excessive blocking of concurrent threads
- Full compatibility with Java Threads, Thread Pools, Java Futures, and even blocking I/O operations
Worker Scope
Problem
Such a design raises some implementation questions. For example, if we want to implement a multithreaded HTTP web application with worker reactors:
- according to these design principles, we need to create separate instances of a worker reactor, a single-threaded HTTP server, and its servlets for each worker thread
- but what if our application has 8 reactor threads with 10 worker-thread components inside, do we have to create 80 of components in total and assign them to each worker thread?
- how is it even possible to manually instantiate, wire, initialize, and start/stop all these components in the right order, and also gracefully shutdown application on start/stop errors?
Solution
Fortunately, thanks to ActiveJ Inject, we have a solution: the @Worker
scope. If you need to implement multiple worker threads:
- include
WorkerPoolModule
module and create aWorkerPool
instance - annotate the components you want to put in each worker thread with the
@Worker
scope annotation
The WorkerPool will automatically instantiate identical dependency graphs for each of these worker threads You are by no means limited to the above scheme with one primary reactor and N worker reactors:
- you can still have completely unrelated / standalone reactors (nor primary, neither worker)
- several primary reactors sharing the same pool of worker reactors or several sets of worker pools with different number or threads
- you can even define your own
@Worker
annotations, and create multiple worker pools with completely unrelated and different dependency graphs All this is in fully transparent and easy-to-understand modules - just mark different components with the appropriate worker annotations and let theWorkerPool
create all instances
To automatically start/stop application components in the correct order, simply include ServiceGraph
module into your Launcher - it is aware of worker pools and will treat worker instances as special compound singleton-like instances.
For example, here is an example of utilizing MultithreadedHttpServerLauncher
which features ServiceGraphModule
:
public final class MultithreadedHttpServerExample extends MultithreadedHttpServerLauncher {
@Provides
@Worker
AsyncServlet servlet(@WorkerId int workerId) {
return request -> HttpResponse.ok200()
.withPlainText("Hello from worker server #" + workerId + "\n")
.toPromise();
}
public static void main(String[] args) throws Exception {
MultithreadedHttpServerExample example = new MultithreadedHttpServerExample();
example.launch(args);
}
}
And its dependency graph looks as follows:
To help you understand how worker pools work, here is a simplified WorkerPool implementation in a nutshell (the actual implementation differs, but not much):
public final class WorkerPool {
private final Scope scope;
private final Injector[] scopeInjectors;
WorkerPool(Injector injector, Scope scope, int workers) {
this.scope = scope;
this.scopeInjectors = new Injector[workers];
for (int i = 0; i < workers; i++) {
scopeInjectors[i] = injector.enterScope(scope, new HashMap<>(), false);
}
}
public <T> Instances<T> getInstances(Key<T> key) {
Instances<T> instances = new Instances<>(new Object[scopeInjectors.length]);
for (int i = 0; i < scopeInjectors.length; i++) {
instances.instances[i] = scopeInjectors[i].getInstance(key);
}
return instances;
}
}
As you can see, the root Injector
simply ‘enters’ the worker scope N
times, so we have N
Injector
s with identical bindings/dependency graphs, but
different containers of instances. Each time we need to create some worker instances, they are created N
times by
each injector and returned as a vector of N
instances.
Examples
To run the examples, you need to clone ActiveJ from GitHub
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project.
These examples are located at activej/examples/core/boot
Basic Worker Pool Example
An example of creating a worker pool with 4 workers:
public final class WorkerPoolModuleExample extends AbstractModule {
@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(4);
}
@Provides
@Worker
String string(@WorkerId int workerId) {
return "Hello from worker #" + workerId;
}
public static void main(String[] args) {
Injector injector = Injector.of(WorkerPoolModule.create(), new WorkerPoolModuleExample());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<String> strings = workerPool.getInstances(String.class);
strings.forEach(System.out::println);
}
}
The dependency graph of the example includes the created worker pool and looks as follows:
Multithreaded Worker Pools Collaboration
Several Worker Pools can work together on a same task. In this example, we have 25 Workers, and each of them has its own Eventloop. These Eventloops are wrapped in Threads and then added to the list of threads. The list is then shuffled and threads with Eventloop tasks start. The task is to put the Eventloop id in the ConcurrentLinkedQueue in accordance to the delay (id multiplied by 100). In this way, we get an ordered queue of Eventloop ids, after that the Threads are parked and the queue is emptied.
public final class MultithreadedWorkerCollab extends AbstractModule {
@Provides
@Worker
Eventloop eventloop(@WorkerId int wid, ConcurrentLinkedQueue<Integer> queue) {
Eventloop eventloop = Eventloop.create();
eventloop.delay(100L * wid, () -> queue.add(wid));
return eventloop;
}
@Provides
WorkerPool workerPool(WorkerPools workerPools) {
return workerPools.createPool(25);
}
@Provides
ConcurrentLinkedQueue<Integer> queue() {
return new ConcurrentLinkedQueue<>();
}
public static void main(String[] args) throws InterruptedException {
Injector injector = Injector.of(WorkerPoolModule.create(), new MultithreadedWorkerCollab());
WorkerPool workerPool = injector.getInstance(WorkerPool.class);
WorkerPool.Instances<Eventloop> eventloops = workerPool.getInstances(Eventloop.class);
List<Thread> threads = new ArrayList<>();
for (Eventloop eventloop : eventloops.getList()) {
Thread thread = new Thread(eventloop);
threads.add(thread);
}
Collections.shuffle(threads);
threads.forEach(Thread::start);
for (Thread thread : threads) {
thread.join();
}
ConcurrentLinkedQueue<Integer> queue = injector.getInstance(new Key<>() {});
while (!queue.isEmpty()) {
System.out.println(queue.poll());
}
}
}