Examples
note
To run the examples in an IDE, you need to clone ActiveJ project:
git clone https://github.com/activej/activej
And import it as a Maven project. Check out branch v5.2. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).
Server Setup
Let's have a closer look at Server Setup Example. To make setup and launching as simple as possible, there is a special SimpleTcpServerLauncher
, an ActiveJ Launcher
implementation (abstracted implementation of main methods). It allows to simply set up applications, so all you need to set up an FS server is to override several Launcher methods:
onInit
- runs prior to application startgetOverrideModule
- overrides Launcher's default internal module definitionsrun
- Launcher's main method, represents business logic
Then launch the Launcher
public class ServerSetupExample extends SimpleTcpServerLauncher { private Path storage;
@Override protected void onInit(Injector injector) throws Exception { storage = Files.createTempDirectory("server_storage"); }
@Override protected Config createConfig() { return super.createConfig() .with("activefs.path", storage.toString()) .with("activefs.listenAddresses", "6732"); }
@Override protected void run() throws Exception { awaitShutdown(); }
public static void main(String[] args) throws Exception { Launcher launcher = new ServerSetupExample(); launcher.launch(args); }}
File Upload
FileUploadExample
also extends Launcher
and thus implements the aforementioned Launcher
methods.
In this example we will use a ActiveFs instance which depends on asynchronous ActiveJ Eventloop
To simplify working with dependencies we will use ActiveJ Inject DI library. It is lightning-fast, efficient and perfectly compatible with Launcher. So we simply @Inject
two instances and @Provides
factory methods.
Just like in the previous example, we will also overwrite Launcher
methods onInit
, getOverrideModule
, and run
.
Also, this example utilizes ActiveJ CSP component, particularly ChannelFileReader
class. It allows to asynchronously read binary data from files.
You can see full example sources on GitHub, here we will consider only the upload process that is defined in the overwritten method run
.
@Overrideprotected void run() throws Exception { ExecutorService executor = newSingleThreadExecutor(); CompletableFuture<Void> future = eventloop.submit(() -> // consumer result here is a marker of it being successfully uploaded ChannelFileReader.open(executor, clientFile) .then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length()))) .whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME)) ); try { future.get(); } finally { executor.shutdown(); }}
File Download
FileDownloadExample has an implementation that is similar to the File Upload example. Here we will consider only the download process that is defined in the overwritten method run
.
@Overrideprotected void run() throws Exception { ExecutorService executor = newSingleThreadExecutor(); CompletableFuture<Void> future = eventloop.submit(() -> ChannelSupplier.ofPromise(client.download(REQUIRED_FILE)) .streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE))) .whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n", REQUIRED_FILE, clientStorage)) ); try { future.get(); } finally { executor.shutdown(); }}
ActiveFs Decorator
Sometimes you may need to override/expand the default behavior of ActiveFs
implementation. To do so, you may utilize a Decorator pattern.
DecoratedActiveFsExample
extends ServerSetupExample
so it inherits its DI bindings. First, we need to override
the binding for ActiveFsServer
to pass decorated ActiveFs
instead of the original one. To do so, we will override
Launcher#getOverrideModule
method and provide a new binding for AsyncFsServer
that uses decorated ActiveFs
.
@Overrideprotected Module getOverrideModule() { return new AbstractModule() { @Eager @Provides ActiveFsServer activeFsServer(Eventloop eventloop, @Named("decorated") ActiveFs decoratedFs, Config config) { return ActiveFsServer.create(eventloop, decoratedFs) .withInitializer(ofActiveFsServer(config.getChild("activefs"))); }
@Provides @Named("decorated") ActiveFs decoratedActiveFs(ActiveFs fs) { return new LoggingActiveFs(fs); } };}
As you can see in decoratedActiveFs(ActiveFs fs)
method, we request original ActiveFs
and return the decorated one
(wrapped in LoggingActiveFs
).
private static final class LoggingActiveFs extends ForwardingActiveFs { private static final Logger logger = LoggerFactory.getLogger(LoggingActiveFs.class);
public LoggingActiveFs(ActiveFs peer) { super(peer); }
@Override public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) { return super.upload(name) .map(consumer -> { logger.info("Starting upload of file: {}. File size is {} bytes", name, size); return consumer .withAcknowledgement(ack -> ack .whenResult(() -> logger.info("Upload of file {} finished", name))); }); }
@Override public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) { return super.download(name, offset, limit) .map(supplier -> { logger.info("Starting downloading file: {}", name); return supplier .withEndOfStream(eos -> eos .whenResult(() -> logger.info("Download of file {} finished", name))); });
}}
LoggingActiveFs
extends ForwardingActiveFs
which simply delegates all of the ActiveFs
method calls to some underlying
ActiveFs
instance. We override methods we want to decorate (download
, upload
) and add custom logging messages when
upload/download starts and finishes.
You can run FileUploadExample followed by FileDownloadExample After this you should see logging output:
INFO Starting upload of file: example.txt. File size is 12 bytesINFO Upload of file example.txt finishedINFO Starting downloading file: example.txtINFO Download of file example.txt finished
Cluster File Storage
With ActiveJ FS you can simply create distributed cluster file storage with high fault tolerance. We will use Docker to launch three virtual servers and one client. The storage will support file uploads with automatic repartitioning according to the provided rule and replication count.
The first thing we need to do is to create a launcher class ClusterTcpServerLauncher
for our server. Extend SimpleTcpServerLauncher
to get all the required instances: ActiveFsServer
, local ActiveFS
, AsyncHttpServer
for GUI that will simplify working with your storage, and other helper instances. In the ClusterTcpServerLauncher
we'll only need to set up utils for repartitioning management like task schedulers, ClusterRepartitionController
, and FsPartitions
for tracking alive partitions and their statuses. The partitions will communicate via TCP protocol, while GUI server will use HTTP.
@Provides@Eager@Named("repartition")EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) { return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition) .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition")));}
@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) { return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions) .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@ProvidesClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) { String localPartitionId = first(partitions.getAllPartitions()); assert localPartitionId != null;
return ClusterRepartitionController.create(localPartitionId, partitions) .withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, ActiveFs activeFs, Config config) throws MalformedDataException { return Initializers.constantDiscoveryService(eventloop, activeFs, config);}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService, OptionalDependency<ServerSelector> serverSelector) { return FsPartitions.create(eventloop, discoveryService) .withServerSelector(serverSelector.orElse(RENDEZVOUS_HASH_SHARDER));}
Now we can move on to creating a client launcher ClusterTcpClientLauncher
. We need to provide ClusterRepartitionController
and a task scheduler to detect dead partitions. Similarly to the server launcher, we need to provide an AsyncHttpServer
for GUI and FsPartitions
for managing partitions. We also need an instance of ClusterActiveFs class, an ActiveFs
implementation that operates on other partitions as a cluster and contains some redundancy and fail-safety capabilities.
@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) { return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions) .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@Provides@EagerAsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) { return AsyncHttpServer.create(eventloop, servlet) .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));}
@ProvidesAsyncServlet guiServlet(ActiveFs activeFs) { return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");}
@ProvidesActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) { return ClusterActiveFs.create(partitions) .withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, Config config) throws MalformedDataException { return Initializers.constantDiscoveryService(eventloop, config.getChild("activefs.cluster"));}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService) { return FsPartitions.create(eventloop, discoveryService);}
Here's the architecture of our distributed P2P storage:
You can create as many partitions as you wish.
To launch the example, run the following scripts to create Docker images and build containers (run all the scripts under
activej/launchers/fs
directory):
# building two images for server and clientdocker build -t cluster-server -f ClusterServerDockerfile .docker build -t cluster-client -f ClusterClientDockerfile .# launching all the servers and client instances in backgrounddocker-compose up -d
The containers will be built with the following configurations:
- Server1: TCP-connection port
9001
, HTTP GUI port8081
- Server2: TCP-connection port
9002
, HTTP GUI port8082
- Server3: TCP-connection port
9003
, HTTP GUI port8083
- Client: HTTP GUI port
8080
Use this script to manage containers:
# to stop a single container:docker-compose stop server1# to stop all the containers:docker-compose down# check containers status:docker-compose ps