Skip to main content

Examples

note

To run the examples in an IDE, you need to clone ActiveJ project:

git clone https://github.com/activej/activej

And import it as a Maven project. Check out branch v5.4.3. Before running the example, build the project (Ctrl + F9 for IntelliJ IDEA).

Server Setup

Let's take a closer look at the Server Setup Example. To make setup and launching as easy as possible, there is a special SimpleTcpServerLauncher, an implementation of ActiveJ Launcher (abstracted implementation of main methods). It allows you to simply set up applications, so all you need to configure an FS server is to override a few Launcher methods:

  • onInit - runs prior to application start
  • getOverrideModule - overrides Launcher's default internal module definitions
  • run - Launcher's main method, represents business logic

Then launch the Launcher

public class ServerSetupExample extends SimpleTcpServerLauncher {  private Path storage;
  @Override  protected void onInit(Injector injector) throws Exception {    storage = Files.createTempDirectory("server_storage");  }
  @Override  protected Config createConfig() {    return super.createConfig()        .with("activefs.path", storage.toString())        .with("activefs.listenAddresses", "6732");  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    Launcher launcher = new ServerSetupExample();    launcher.launch(args);  }}

See full example on GitHub

File Upload

FileUploadExample also extends Launcher and thus implements the above Launcher methods.

In this example, we will use an ActiveFs instance that depends on the asynchronous ActiveJ Eventloop To simplify working with dependencies, we will use the ActiveJ Inject DI library. It is lightning fast, efficient, and perfectly compatible with Launcher. So we simply @Inject two instances and add @Provides factory methods. As in the previous example, we will also overwrite Launcher's methods onInit, getOverrideModule, and run.

Also, this example uses the ActiveJ CSP component, specifically the ChannelFileReader class. It allows asynchronously reading binary data from files.

You can see full example sources on GitHub, here we will consider only the upload process that is defined in the overwritten method run.

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      // consumer result here is a marker of it being successfully uploaded      ChannelFileReader.open(executor, clientFile)          .then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))          .whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))  );  try {    future.get();  } finally {    executor.shutdown();  }}

See full example on GitHub

File Download

FileDownloadExample has an implementation similar to the File Upload example. Here we will only look at the download process, which is defined in the overwritten run() method.

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))          .streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))          .whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",              REQUIRED_FILE, clientStorage))  );  try {    future.get();  } finally {    executor.shutdown();  }}

See full example on GitHub

ActiveFs Decorator

Sometimes you may need to override/extend the default behavior of ActiveFs implementation. You can use a Decorator pattern to do this. The DecoratedActiveFsExample demonstrates how to do just that. It decorates the ActiveFs implementation by adding additional logs for uploading and downloading files.

DecoratedActiveFsExample extends ServerSetupExample, so it inherits its DI bindings. First, we need to override the binding for ActiveFsServer to pass decorated ActiveFs instead of the original one. To do this, we will override the Launcher#getOverrideModule method and provide a new binding for AsyncFsServer that uses the decorated ActiveFs.

@Overrideprotected Module getOverrideModule() {  return new AbstractModule() {    @Eager    @Provides    ActiveFsServer activeFsServer(Eventloop eventloop, @Named("decorated") ActiveFs decoratedFs, Config config) {      return ActiveFsServer.create(eventloop, decoratedFs)          .withInitializer(ofActiveFsServer(config.getChild("activefs")));    }
    @Provides    @Named("decorated")    ActiveFs decoratedActiveFs(ActiveFs fs) {      return new LoggingActiveFs(fs);    }  };}

As you can see, in the decoratedActiveFs(ActiveFs fs) method we request the original ActiveFs and return the decorated one (wrapped in LoggingActiveFs).

private static final class LoggingActiveFs extends ForwardingActiveFs {  private static final Logger logger = LoggerFactory.getLogger(LoggingActiveFs.class);
  public LoggingActiveFs(ActiveFs peer) {    super(peer);  }
  @Override  public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {    return super.upload(name)        .map(consumer -> {          logger.info("Starting upload of file: {}. File size is {} bytes", name, size);          return consumer              .withAcknowledgement(ack -> ack                  .whenResult(() -> logger.info("Upload of file {} finished", name)));        });  }
  @Override  public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {    return super.download(name, offset, limit)        .map(supplier -> {          logger.info("Starting downloading file: {}", name);          return supplier              .withEndOfStream(eos -> eos                  .whenResult(() -> logger.info("Download of file {} finished", name)));        });
  }}

LoggingActiveFs extends ForwardingActiveFs, which simply delegates all ActiveFs method calls to some underlying ActiveFs instance. We override the methods we want to decorate (download, upload) and add custom logging messages when upload/download starts and finishes.

You can run FileUploadExample followed by FileDownloadExample After this you should see logging output:

INFO Starting upload of file: example.txt. File size is 12 bytesINFO Upload of file example.txt finishedINFO Starting downloading file: example.txtINFO Download of file example.txt finished

See full example on GitHub

Cluster File Storage

With ActiveJ FS you can easily create a distributed cluster file storage with high fault tolerance. We will use Docker to launch three virtual servers and one client. The storage will support file uploads with automatic repartitioning according to a given rule and number of replicas.

The first thing we need to do is to create a ClusterTcpServerLauncher class for our server. Extend SimpleTcpServerLauncher to get all the necessary instances: ActiveFsServer, local ActiveFS, AsyncHttpServer for the GUI, which will simplify working with your storage, and other helper instances. In ClusterTcpServerLauncher, we will only need to configure utils for repartitioning management, such as task schedulers, ClusterRepartitionController, and FsPartitions to track alive partitions and their statuses. The partitions will communicate over TCP, while the GUI server will use HTTP.

@Provides@Eager@Named("repartition")EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {  return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition")));}
@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@ProvidesClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {  String localPartitionId = first(partitions.getAllPartitions());  assert localPartitionId != null;
  return ClusterRepartitionController.create(localPartitionId, partitions)      .withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, ActiveFs activeFs, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, activeFs, config);}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService, OptionalDependency<ServerSelector> serverSelector) {  return FsPartitions.create(eventloop, discoveryService)      .withServerSelector(serverSelector.orElse(RENDEZVOUS_HASH_SHARDER));}

Now we can move on to creating the ClusterTcpClientLauncher class. We need to provide the ClusterRepartitionController and a task scheduler to detect dead partitions. Similar to the server launcher, we need to provide an an AsyncHttpServer for the GUI and FsPartitions for partition management. We also need an instance of the ClusterActiveFs class, an implementation of ActiveFs that works with other partitions as a cluster and contains some redundancy and fail tolerance features.

@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@Provides@EagerAsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {  return AsyncHttpServer.create(eventloop, servlet)      .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));}
@ProvidesAsyncServlet guiServlet(ActiveFs activeFs) {  return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");}
@ProvidesActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {  return ClusterActiveFs.create(partitions)      .withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, config.getChild("activefs.cluster"));}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService) {  return FsPartitions.create(eventloop, discoveryService);}

Here's the architecture of our distributed P2P storage:

Distributed P2P storageDistributed P2P storage

You can create as many partitions as you wish.

To launch the example, run the following scripts to create Docker images and build containers (run all the scripts under activej/launchers/fs directory):

# building two images for server and clientdocker build -t cluster-server -f ClusterServerDockerfile .docker build -t cluster-client -f ClusterClientDockerfile .# launching all the servers and client instances in backgrounddocker-compose up -d

The containers will be built with the following configurations:

  • Server1: TCP-connection port 9001, HTTP GUI port 8081
  • Server2: TCP-connection port 9002, HTTP GUI port 8082
  • Server3: TCP-connection port 9003, HTTP GUI port 8083
  • Client: HTTP GUI port 8080

Use this script to manage containers:

# to stop a single container:docker-compose stop server1# to stop all the containers:docker-compose down# check containers status:docker-compose ps

See full example on GitHub