ActiveJ | Creating a Cluster File Storage

Purpose

In this tutorial we will create a cluster storage with three servers that communicate via TCP protocol. There also will be a cluster client. Each server will have an HTTP GUI server for simplicity of use and debugging. The cluster storage will support file uploads with automatic repartitioning according to the provided rule and replication count.

What ActiveJ technologies will be used?

What is ActiveFS?

ActiveFS is one of the most important technologies in this tutorial. It provides a lightweight abstraction on top of common file operations like upload, download, append, list, copy, move, delete, and others. It allows operations with local, remote or distributed file storage. ActiveFS is a stand-alone technology of ActiveJ Java platform, it can be used independently of the platform.

Import the components to your project

Add all the required Maven dependencies to your project:

<dependencies>
   <dependency>
       <groupId>io.activej</groupId>
       <artifactId>activej-launchers-fs</artifactId>
       <version>2.2</version>
   </dependency>
   <dependency>
       <groupId>com.github.spullara.mustache.java</groupId>
       <artifactId>compiler</artifactId>
       <version>0.9.4</version>
   </dependency>
   <dependency>
       <groupId>ch.qos.logback</groupId>
       <artifactId>logback-classic</artifactId>
       <version>1.2.3</version>
   </dependency>
</dependencies>

activej-launchers-fs module will import all the required ActiveJ technologies that were mentioned before.

Set up Server Launcher

The first thing we need to do is to create a launcher class ClusterServerLauncher for our servers. We’ll need the following instances:

The partitions will communicate via TCP protocol, while GUI server will use HTTP protocol.

//TODO use code snippet from github
public final class ClusterServerLauncher extends Launcher {
    public static final Path DEFAULT_PATH = Paths.get(System.getProperty("java.io.tmpdir"), "fs-storage");

    @Provides
    public Eventloop eventloop() {
        return Eventloop.create();
    }

    @Provides
    Executor executor() {
        return Executors.newCachedThreadPool();
    }

    @Provides
    ActiveFs localActivefs(Eventloop eventloop, Executor executor, Config config) {
        return LocalActiveFs.create(eventloop, executor, config.get(ofPath(), "activefs.path", DEFAULT_PATH));
    }

    @Eager
    @Provides
    ActiveFsServer activeFsServer(Eventloop eventloop, ActiveFs activeFs, Config config) {
        return ActiveFsServer.create(eventloop, activeFs)
                .withInitializer(ofActiveFsServer(config.getChild("activefs")));
    }

    @Provides
    AsyncServlet guiServlet(ActiveFs fs, ClusterRepartitionController controller) {
        return ActiveFsGuiServlet.create(fs, "Cluster server [" + controller.getLocalPartitionId() + ']');
    }

    @Provides
    @Eager
    AsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {
        return AsyncHttpServer.create(eventloop, servlet)
                .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));
    }

    @Provides
    FsPartitions fsPartitions(Config config, Eventloop eventloop, ActiveFs fs) {
        Map<Object, ActiveFs> partitions = new LinkedHashMap<>();
        partitions.put(config.get("activefs.repartition.localPartitionId"), fs);

        return FsPartitions.create(eventloop, partitions)
                .withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
    }


    @Provides
    ClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {
        String localPartitionId = first(partitions.getAllPartitions());
        assert localPartitionId != null;

        return ClusterRepartitionController.create(localPartitionId, partitions)
                .withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));
    }

    @Provides
    @Eager
    @Named("repartition")
    EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {
        return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)
                .withInterval(Duration.ofSeconds(1));
    }

    @Provides
    @Eager
    @Named("clusterDeadCheck")
    EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
        return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
                .withInterval(Duration.ofSeconds(1));
    }

    @Provides
    Config config() {
        return Config.create()
                .with("activefs.listenAddresses", "*:9000")
                .with("activefs.http.gui.listenAddresses", "*:8080")
                .overrideWith(Config.ofClassPathProperties("activefs-server.properties", true))
                .overrideWith(Config.ofSystemProperties("config"));
    }

    @Override
    protected final Module getModule() {
        return combine(
                ServiceGraphModule.create(),
                ConfigModule.create()
                        .withEffectiveConfigLogger());
    }

    @Override
    protected void run() throws Exception {
        awaitShutdown();
    }

    public static void main(String[] args) throws Exception {
        new BasicClusterServerLauncher().launch(args);
    }
}

Client Launcher

Now we can move on to creating a client launcher ClusterTcpClientLauncher. We need to provide a task scheduler to detect dead partitions, AsyncHttpServer for GUI, remote ActiveFS, and FsPartitions for managing partitions. We also need an instance of ClusterActiveFs class, an ActiveFs implementation that operates on other partitions as a cluster and contains some redundancy and fail-tolerance capabilities.

public class ClusterTcpClientLauncher extends Launcher {
   public static final String PROPERTIES_FILE = "activefs-client.properties";

   public static final String DEFAULT_DEAD_CHECK_INTERVAL = "1 seconds";
   public static final String DEFAULT_GUI_SERVER_LISTEN_ADDRESS = "*:8080";

   @Provides
   Eventloop eventloop() {
       return Eventloop.create();
   }

   @Provides
   @Eager
   @Named("clusterDeadCheck")
   EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {
       return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)
               .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));
   }

   @Provides
   @Eager
   AsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {
       return AsyncHttpServer.create(eventloop, servlet)
               .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));
   }

   @Provides
   AsyncServlet guiServlet(ActiveFs activeFs) {
       return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");
   }

   @Provides
   ActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {
       return ClusterActiveFs.create(partitions)
               .withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));
   }

   @Provides
   FsPartitions fsPartitions(Eventloop eventloop, Config config) {
       return FsPartitions.create(eventloop)
               .withInitializer(ofFsPartitions(config.getChild("activefs.cluster")));
   }

   @Provides
   Config config() {
       return createConfig()
               .overrideWith(Config.ofClassPathProperties(PROPERTIES_FILE, true))
               .overrideWith(Config.ofSystemProperties("config"));
   }

   protected Config createConfig(){
       return Config.create()
               .with("activefs.http.gui.listenAddresses", DEFAULT_GUI_SERVER_LISTEN_ADDRESS)
               .with("activefs.repartition.deadCheck.schedule.type", "interval")
               .with("activefs.repartition.deadCheck.schedule.value", DEFAULT_DEAD_CHECK_INTERVAL);
   }

   @Override
   protected final Module getModule() {
       return combine(
               ServiceGraphModule.create(),
               JmxModule.create(),
               ConfigModule.create()
                       .withEffectiveConfigLogger());
   }

   @Override
   protected void run() throws Exception {
       awaitShutdown();
   }

   public static void main(String[] args) throws Exception {
       new ClusterTcpClientLauncher().launch(args);
   }
}

Here’s the architecture of our distributed P2P storage:

You can create as many partitions as you wish, cluster client is optional as you can create an alternative client implementation that supports ActiveFS protocol.

Testing out the storage

Let’s launch three partitions on different ports. For this purpose you need to create three Run/Debug configurations for ClusterTcpServerLauncher and provide the following VM options:

-Dconfig.activefs.listenAddresses=*:9001
-Dconfig.activefs.http.gui.listenAddresses=*:8081
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.cluster.replicationCount=2
-Dconfig.activefs.path=/tmp/server1
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9001
-Dconfig.activefs.listenAddresses=*:9002
-Dconfig.activefs.http.gui.listenAddresses=*:8082
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.cluster.replicationCount=2
-Dconfig.activefs.path=/tmp/server2
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9002
-Dconfig.activefs.listenAddresses=*:9003
-Dconfig.activefs.http.gui.listenAddresses=*:8083
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
-Dconfig.activefs.cluster.replicationCount=2
-Dconfig.activefs.path=/tmp/server3
-Dconfig.activefs.repartition.localPartitionId=127.0.0.1:9003

Next, provide the following VM options for ClusterTcpClientLauncher:

-Dconfig.activefs.http.gui.listenAddresses=*:8080
-Dconfig.activefs.cluster.partitions=127.0.0.1:9001,127.0.0.1:9002,127.0.0.1:9003
The storage will run with the following configurations:
Server1: TCP-connection port 9001, HTTP GUI port 8081
Server2: TCP-connection port 9002, HTTP GUI port 8082
Server3: TCP-connection port 9003, HTTP GUI port 8083
Client: HTTP GUI port 8080

Launch all the created configurations and go to 127.0.0.1:8080 in your browser to work with the storage. Switch between ports (8081, 8082, 8083) to check your partitions’ GUI. You can try to upload files, create directories, kill some partitions to check repartitioning.