Examples
To run the examples in an IDE, you need to clone ActiveJ project:
git clone https://github.com/activej/activej
And import it as a Maven project. Check out branch v6.0-beta2. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).
Server Setup
Let's take a closer look at the Server Setup Example. To make setup and launching as easy as possible, there is a special SimpleTcpServerLauncher
, an implementation of ActiveJ Launcher
(abstracted implementation of main methods). It allows you to simply set up applications, so all you need to configure an FS server is to override a few Launcher methods:
onInit
- runs prior to application startcreateConfig
- overridesSimpleTcpServerLauncher
's method and adds custom configurationrun
- Launcher's main method, represents business logic
Then launch the Launcher
public class ServerSetupExample extends SimpleTcpServerLauncher {
private Path storage;
@Override
protected void onInit(Injector injector) throws Exception {
storage = Files.createTempDirectory("server_storage");
}
@Override
protected Config createConfig() {
return super.createConfig()
.overrideWith(
Config.create()
.with("fs.path", storage.toString())
.with("fs.listenAddresses", "6732")
);
}
@Override
protected void run() throws Exception {
awaitShutdown();
}
public static void main(String[] args) throws Exception {
Launcher launcher = new ServerSetupExample();
launcher.launch(args);
}
}
File Upload
FileUploadExample
also extends Launcher
and thus implements the above Launcher
methods.
In this example, we will use an IFileSystem instance that depends on the asynchronous ActiveJ Eventloop
To simplify working with dependencies, we will use the ActiveJ Inject DI library. It is lightning fast, efficient, and perfectly compatible with Launcher. So we simply @Inject
two instances and add @Provides
factory methods.
As in the previous example, we will also overwrite Launcher
's methods onInit
, getModule
, and run
.
Also, this example uses the ActiveJ CSP component, specifically the ChannelFileReader
class. It allows asynchronously reading binary data from files.
You can see full example sources on GitHub, here we will consider only the upload process that is defined in the overwritten method run
.
@Override
protected void run() throws Exception {
ExecutorService executor = newSingleThreadExecutor();
CompletableFuture<Void> future = reactor.submit(() ->
// consumer result here is a marker of it being successfully uploaded
ChannelFileReader.open(executor, clientFile)
.then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))
.whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))
);
try {
future.get();
} finally {
executor.shutdown();
}
}
File Download
FileDownloadExample has an implementation similar to the File Upload example. Here we will only look at the download process, which is defined in the overwritten run()
method.
@Override
protected void run() throws Exception {
ExecutorService executor = newSingleThreadExecutor();
CompletableFuture<Void> future = reactor.submit(() ->
ChannelSuppliers.ofPromise(client.download(REQUIRED_FILE))
.streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))
.whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",
REQUIRED_FILE, clientStorage))
);
try {
future.get();
} finally {
executor.shutdown();
}
}
FileSystem Decorator
Sometimes you may need to override/extend the default behavior of IFileSystem
implementation. You can use a Decorator pattern to do this.
The DecoratedFileSystemExample demonstrates how to do just that. It decorates the IFileSystem
implementation by adding additional logs for uploading and downloading files.
DecoratedFileSystemExample
extends ServerSetupExample
, so it inherits its DI bindings. First, we need to override
the binding for FileSystemServer
to pass decorated IFileSystem
instead of the original one. To do this, we will override
the Launcher#getOverrideModule
method and provide a new binding for FileSystemServer
that uses the decorated IFileSystem
.
@Override
protected Module getOverrideModule() {
return new AbstractModule() {
@Eager
@Provides
FileSystemServer fileSystemServer(NioReactor reactor, @Named("decorated") IFileSystem decoratedFS, Config config) {
return FileSystemServer.builder(reactor, decoratedFS)
.initialize(ofFileSystemServer(config.getChild("fs")))
.build();
}
@Provides
@Named("decorated")
IFileSystem decoratedFileSystem(IFileSystem fs) {
return new LoggingFileSystem(fs);
}
};
}
As you can see, in the decoratedFileSystem(IFileSystem fs)
method we request the original IFileSystem
and return the decorated one
(wrapped in LoggingFileSystem
).
private static final class LoggingFileSystem extends ForwardingFileSystem {
private static final Logger logger = LoggerFactory.getLogger(LoggingFileSystem.class);
public LoggingFileSystem(IFileSystem peer) {
super(peer);
}
@Override
public Promise<ChannelConsumer<ByteBuf>> upload(String name, long size) {
return super.upload(name)
.map(consumer -> {
logger.info("Starting upload of file: {}. File size is {} bytes", name, size);
return consumer
.withAcknowledgement(ack -> ack
.whenResult(() -> logger.info("Upload of file {} finished", name)));
});
}
@Override
public Promise<ChannelSupplier<ByteBuf>> download(String name, long offset, long limit) {
return super.download(name, offset, limit)
.map(supplier -> {
logger.info("Starting downloading file: {}", name);
return supplier
.withEndOfStream(eos -> eos
.whenResult(() -> logger.info("Download of file {} finished", name)));
});
}
}
LoggingFileSystem
extends ForwardingFileSystem
, which simply delegates all IFileSystem
method calls to some underlying
IFileSystem
instance. We override the methods we want to decorate (download
, upload
) and add custom logging messages when
upload/download starts and finishes.
You can run FileUploadExample followed by FileDownloadExample After this you should see logging output:
INFO Starting upload of file: example.txt. File size is 12 bytes
INFO Upload of file example.txt finished
INFO Starting downloading file: example.txt
INFO Download of file example.txt finished
Cluster File Storage
With ActiveJ FS you can easily create a distributed cluster file storage with high fault tolerance. We will use Docker to launch three virtual servers and one client. The storage will support file uploads with automatic repartitioning according to a given rule and number of replicas.
The first thing we need to do is to create a ClusterTcpServerLauncher
class for our server. Extend SimpleTcpServerLauncher
to get all the necessary instances: FileSystemServer
, local IFileSystem
, HttpServer
for the GUI, which will simplify working with your storage, and other helper instances. In ClusterTcpServerLauncher
, we will only need to configure utils for repartitioning management, such as task schedulers, ClusterRepartitionController
, and FileSystemPartitions
to track alive partitions and their statuses. The partitions will communicate over TCP, while the GUI server will use HTTP.
@Provides
@Eager
@Named("repartition")
TaskScheduler repartitionScheduler(ClusterRepartitionController controller, Config config) {
return TaskScheduler.builder(controller.getReactor(), controller::repartition)
.initialize(ofTaskScheduler(config.getChild("fs.repartition")))
.build();
}
@Provides
@Eager
@Named("clusterDeadCheck")
TaskScheduler deadCheckScheduler(Config config, FileSystemPartitions partitions) {
return TaskScheduler.builder(partitions.getReactor(), partitions::checkDeadPartitions)
.initialize(ofTaskScheduler(config.getChild("fs.repartition.deadCheck")))
.build();
}
@Provides
ClusterRepartitionController repartitionController(
Reactor reactor, FileSystemServer localServer, FileSystemPartitions partitions, Config config
) {
String localPartitionId = first(partitions.getAllPartitions());
assert localPartitionId != null;
return ClusterRepartitionController.builder(reactor, localPartitionId, partitions)
.initialize(ofClusterRepartitionController(config.getChild("fs.repartition")))
.build();
}
@Provides
IDiscoveryService discoveryService(NioReactor reactor,
IFileSystem fileSystem,
Config config) throws MalformedDataException {
return Initializers.constantDiscoveryService(reactor, fileSystem, config);
}
@Provides
FileSystemPartitions fileSystemPartitions(Reactor reactor, IDiscoveryService discoveryService, OptionalDependency<ServerSelector> serverSelector) {
return FileSystemPartitions.builder(reactor, discoveryService)
.withServerSelector(serverSelector.orElse(RENDEZVOUS_HASH_SHARDER))
.build();
}
Now we can move on to creating the ClusterTcpClientLauncher
class. We need to provide the ClusterRepartitionController
and a task scheduler to detect dead partitions. Similar to the server launcher, we need to provide an HttpServer
for the GUI and FileSystemPartitions
for partition management. We also need an instance of the ClusterFileSystem class, an implementation of IFileSystem
that works with other partitions as a cluster and contains some redundancy and fail tolerance features.
@Provides
@Eager
@Named("clusterDeadCheck")
TaskScheduler deadCheckScheduler(Config config, FileSystemPartitions partitions) {
return TaskScheduler.builder(partitions.getReactor(), partitions::checkDeadPartitions)
.initialize(ofTaskScheduler(config.getChild("fs.repartition.deadCheck")))
.build();
}
@Provides
@Eager
HttpServer guiServer(NioReactor reactor, AsyncServlet servlet, Config config) {
return HttpServer.builder(reactor, servlet)
.initialize(ofHttpServer(config.getChild("fs.http.gui")))
.build();
}
@Provides
AsyncServlet guiServlet(Reactor reactor, IFileSystem fileSystem) {
return FileSystemGuiServlet.create(reactor, fileSystem, "Cluster FS Client");
}
@Provides
IFileSystem fileSystem(Reactor reactor, FileSystemPartitions partitions, Config config) {
return ClusterFileSystem.builder(reactor, partitions)
.initialize(ofClusterFileSystem(config.getChild("fs.cluster")))
.build();
}
@Provides
IDiscoveryService discoveryService(NioReactor reactor, Config config) throws MalformedDataException {
return Initializers.constantDiscoveryService(reactor, config);
}
@Provides
FileSystemPartitions fileSystemPartitions(Reactor reactor, IDiscoveryService discoveryService) {
return FileSystemPartitions.create(reactor, discoveryService);
}
Here's the architecture of our distributed P2P storage:
You can create as many partitions as you wish.
To launch the example, run the following scripts to create Docker images and build containers (run all the scripts under
activej/launchers/fs
directory):
# building two images for server and client
docker build -t cluster-server -f ClusterServerDockerfile .
docker build -t cluster-client -f ClusterClientDockerfile .
# launching all the servers and client instances in background
docker-compose up -d
The containers will be built with the following configurations:
- Server1: TCP-connection port
9001
, HTTP GUI port8081
- Server2: TCP-connection port
9002
, HTTP GUI port8082
- Server3: TCP-connection port
9003
, HTTP GUI port8083
- Client: HTTP GUI port
8080
Use this script to manage containers:
# to stop a single container:
docker-compose stop server1
# to stop all the containers:
docker-compose down
# check containers status:
docker-compose ps