Перейти к основному содержанию

Примеры

note

Чтобы запустить примеры в IDE, необходимо клонировать проект ActiveJ:

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите ветку v5.4.3. Перед запуском примера выполните сборку проекта (Ctrl F9 для IntelliJ IDEA).

Настройка сервера

Рассмотрим подробнее пример настройки сервера ***. Чтобы максимально упростить настройку и запуск, существует специальный SimpleTcpServerLauncher, ActiveJ Launcher реализация (абстрактная реализация основных* методов). Он позволяет просто устанавливать приложения, поэтому все, что вам нужно для настройки FS-сервера - это переопределить несколько методов Launcher:

  • onInit - выполняется перед запуском приложения
  • getOverrideModule - переопределяет определения внутренних модулей Launcher по умолчанию.
  • run - основной метод Launcher, представляет бизнес-логику.

Затем запустите Launcher

public class ServerSetupExample extends SimpleTcpServerLauncher {  private Path storage;
  @Override  protected void onInit(Injector injector) throws Exception {    storage = Files.createTempDirectory("server_storage");  }
  @Override  protected Config createConfig() {    return super.createConfig()        .with("activefs.path", storage.toString())        .with("activefs.listenAddresses", "6732");  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    Launcher launcher = new ServerSetupExample();    launcher.launch(args);  }}

Полный текст примера смотрите на GitHub.

Загрузка файлов

FileUploadExample также расширяет Launcher и, таким образом, реализует вышеупомянутые Launcher методы. В этом примере мы будем использовать ActiveFs экземпляр, который зависит от асинхронного ActiveJ Eventloop Для упрощения работы с зависимостями мы будем использовать ActiveJ Inject DI библиотеку. Он молниеносен, эффективен и отлично совместим с Launcher. Поэтому мы просто @Inject два экземпляра и @Provides заводские методы. Как и в предыдущем примере, мы также перезапишем Launcher методы onInit, getOverrideModule, и run. Также в данном примере используется компонент ActiveJ CSP , в частности ChannelFileReader класс. Он позволяет асинхронно считывать двоичные данные из файлов. Полные исходные тексты примеров вы можете посмотреть на GitHub, здесь мы рассмотрим только процесс выгрузки, который определен в перезаписанном методе run.

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      // consumer result here is a marker of it being successfully uploaded      ChannelFileReader.open(executor, clientFile)          .then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))          .whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))  );  try {    future.get();  } finally {    executor.shutdown();  }}

Полный текст примера смотрите на GitHub.

Загрузка файлов

FileDownloadExample имеет реализацию, аналогичную примеру File Upload . Здесь мы рассмотрим только процесс загрузки, который определен в перезаписанном методе run.

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))          .streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))          .whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",              REQUIRED_FILE, clientStorage))  );  try {    future.get();  } finally {    executor.shutdown();  }}

Полный текст примера смотрите на GitHub.

Декоратор ActiveFs

Иногда вам может понадобиться переопределить/расширить поведение по умолчанию в реализации ActiveFs . Для этого можно воспользоваться шаблоном Decorator . На сайте DecoratedActiveFsExample показано, как это сделать. Он украшает реализацию ActiveFs , добавляя дополнительные журналы для загрузки и выгрузки файлов. DecoratedActiveFsExample расширяет ServerSetupExample , поэтому он наследует его DI привязки. Во-первых, нам нужно переопределить привязку для ActiveFsServer , чтобы передать декорированный ActiveFs вместо оригинального. Для этого мы переопределим метод Launcher#getOverrideModule и предоставим новую привязку для AsyncFsServer , которая использует декорированный ActiveFs.

@Overrideprotected Module getOverrideModule() {  return new AbstractModule() {    @Eager    @Provides    ActiveFsServer activeFsServer(Eventloop eventloop, @Named("decorated") ActiveFs decoratedFs, Config config) {      return ActiveFsServer.create(eventloop, decoratedFs)          .withInitializer(ofActiveFsServer(config.getChild("activefs")));    }
    @Provides    @Named("decorated")    ActiveFs decoratedActiveFs(ActiveFs fs) {      return new LoggingActiveFs(fs);    }  };}

Как видно из метода decoratedActiveFs(ActiveFs fs) , мы запрашиваем исходный ActiveFs и возвращаем декорированный (обернутый в LoggingActiveFs).

private static final class LoggingActiveFs extends ForwardingActiveFs {  private static final Logger logger = LoggerFactory.getLogger(LoggingActiveFs.class);
  public LoggingActiveFs(ActiveFs peer) {    super(peer);  }
  @Override  public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {    return super.upload(name)        .map(consumer -> {          logger.info("Starting upload of file: {}. File size is {} bytes", name, size);          return consumer              .withAcknowledgement(ack -> ack                  .whenResult(() -> logger.info("Upload of file {} finished", name)));        });  }
  @Override  public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {    return super.download(name, offset, limit)        .map(supplier -> {          logger.info("Starting downloading file: {}", name);          return supplier              .withEndOfStream(eos -> eos                  .whenResult(() -> logger.info("Download of file {} finished", name)));        });
  }}

LoggingActiveFs расширяет ForwardingActiveFs , который просто делегирует все вызовы методов ActiveFs некоторому базовому экземпляру ActiveFs . Мы переопределяем методы, которые хотим украсить (download, upload) и добавляем пользовательские сообщения логирования, когда upload/download начинается и заканчивается. Вы можете запустить FileUploadExample , затем FileDownloadExample После этого вы должны увидеть вывод логов:

INFO Начало загрузки файла: example.txt. Размер файла 12 байтINFO Загрузка файла example.txt завершенаINFO Начало загрузки файла: example.txtINFO Загрузка файла example.txt завершена

Полный текст примера смотрите на GitHub.

Кластерное файловое хранилище

С помощью ActiveJ FS можно просто создать распределенное кластерное файловое хранилище с высокой отказоустойчивостью. Мы будем использовать Docker для запуска трех виртуальных серверов и одного клиента. Хранилище будет поддерживать загрузку файлов с автоматической переразметкой в соответствии с заданным правилом и количеством репликаций.

Первое, что нам нужно сделать, это создать класс launcher ClusterTcpServerLauncher для нашего сервера. Разместить SimpleTcpServerLauncher чтобы получить все необходимые экземпляры: ActiveFsServer, местный ActiveFS, AsyncHttpServer для графического интерфейса, который упростит работу с вашим хранилищем, и другие вспомогательные экземпляры. В ClusterTcpServerLauncher нам понадобится настроить только утилиты для управления переразбиением, такие как планировщики задач, ClusterRepartitionControllerи FsPartitions для отслеживания живых разделов и их состояния. Разделы будут взаимодействовать по протоколу TCP, а сервер GUI будет использовать HTTP.

@Provides@Eager@Named("repartition")EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {  return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition")));}
@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@ProvidesClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {  String localPartitionId = first(partitions.getAllPartitions());  assert localPartitionId != null;
  return ClusterRepartitionController.create(localPartitionId, partitions)      .withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, ActiveFs activeFs, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, activeFs, config);}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService, OptionalDependency<ServerSelector> serverSelector) {  return FsPartitions.create(eventloop, discoveryService)      .withServerSelector(serverSelector.orElse(RENDEZVOUS_HASH_SHARDER));}

Теперь мы можем перейти к созданию программы запуска клиента ClusterTcpClientLauncher. Нам нужно предоставить ClusterRepartitionController и планировщик задач для обнаружения мертвых разделов. Аналогично программе запуска сервера, нам нужно предоставить AsyncHttpServer для GUI и FsPartitions для управления разделами. Нам также нужен экземпляр класса ClusterActiveFs , реализация ActiveFs , которая работает на других разделах как кластер и содержит некоторые возможности резервирования и отказоустойчивости.

@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@Provides@EagerAsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {  return AsyncHttpServer.create(eventloop, servlet)      .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));}
@ProvidesAsyncServlet guiServlet(ActiveFs activeFs) {  return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");}
@ProvidesActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {  return ClusterActiveFs.create(partitions)      .withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, config.getChild("activefs.cluster"));}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService) {  return FsPartitions.create(eventloop, discoveryService);}

Вот архитектура нашего распределенного P2P-хранилища:

Distributed P2P storageDistributed P2P storage

Вы можете создать столько разделов, сколько пожелаете.

Чтобы запустить пример, выполните следующие скрипты для создания образов Docker и сборки контейнеров (запустите все скрипты в каталоге activej/launchers/fs):

# создание двух образов для сервера и клиентаdocker build -t cluster-server -f ClusterServerDockerfile .docker build -t cluster-client -f ClusterClientDockerfile .# запуск всех серверов и клиентских экземпляров в фоновом режимеdocker-compose up -d

Контейнеры будут построены со следующими конфигурациями:

  • Сервер1: порт TCP-соединения 9001, порт HTTP GUI 8081
  • Server2: порт TCP-соединения 9002, порт HTTP GUI 8082
  • Server3: порт TCP-соединения 9003, порт HTTP GUI 8083
  • Клиент: HTTP GUI порт 8080

Используйте этот сценарий для управления контейнерами:

# чтобы остановить один контейнер:docker-compose stop server1# чтобы остановить все контейнеры:docker-compose down# проверить состояние контейнеров:docker-compose ps

Полный текст примера смотрите на GitHub.