跳到主要内容

示例:

note

为了在IDE中运行这些例子,你需要克隆ActiveJ项目。

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查阅分支机构 v5.1。 在运行这个例子之前,构建项目(Ctrl F9 for IntelliJ IDEA)。

服务器设置

让我们仔细看看 服务器设置实例。 为了使设置和启动尽可能简单,有一个特殊的 "SimpleTcpServerLauncher"。,一个 ActiveJ Launcher 实现(抽象地实现了 的主要 方法)。 它允许简单地设置应用程序,所以你设置FS服务器所需要的只是覆盖几个Launcher方法。

  • onInit - 在应用程序启动前运行。
  • getOverrideModule - 覆盖Launcher的默认内部模块定义。
  • run - 启动器的主要方法,代表业务逻辑

然后 启动启动器"。

public class ServerSetupExample extends SimpleTcpServerLauncher {  private Path storage;
  @Override  protected void onInit(Injector injector) throws Exception {    storage = Files.createTempDirectory("server_storage");  }
  @Override  protected Config createConfig() {    return super.createConfig()        .with("activefs.path", storage.toString())        .with("activefs.listenAddresses", "6732");  }
  @Override  protected void run() throws Exception {    awaitShutdown();  }
  public static void main(String[] args) throws Exception {    Launcher launcher = new ServerSetupExample();    launcher.launch(args);  }}

在GitHub上看到完整的例子

文件上传

FileUploadExample 也扩展了 Launcher ,因此实现了上述的 Launcher 方法。 在这个例子中,我们将使用一个 ActiveFs 实例,它依赖于异步 ActiveJ Eventloop 为了简化依赖关系的工作,我们将使用 ActiveJ Inject DI库。 它速度快,效率高,与Launcher完美兼容。 所以我们只是 @Inject 两个实例和 @提供 工厂方法。 就像前面的例子一样,我们也将覆盖 Launcher 方法 onInit, getOverrideModule, 和 run。 另外,这个例子利用了ActiveJ CSP 组件,特别是 频道文件读取器 类。 它允许异步地从文件中读取二进制数据。 你可以在以下网站看到完整的例子来源 GitHub,这里我们将只考虑被覆盖的方法 run中定义的上传过程。

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      // consumer result here is a marker of it being successfully uploaded      ChannelFileReader.open(executor, clientFile)          .then(cfr -> cfr.streamTo(client.upload(FILE_NAME, EXAMPLE_TEXT.length())))          .whenResult(() -> System.out.printf("%nFile '%s' successfully uploaded%n%n", FILE_NAME))  );  try {    future.get();  } finally {    executor.shutdown();  }}

在GitHub上看到完整的例子

文件下载

FileDownloadExample 有一个实现类似于 文件上传 的例子。 这里我们将只考虑被覆盖的方法 run中定义的下载过程。

@Overrideprotected void run() throws Exception {  ExecutorService executor = newSingleThreadExecutor();  CompletableFuture<Void> future = eventloop.submit(() ->      ChannelSupplier.ofPromise(client.download(REQUIRED_FILE))          .streamTo(ChannelFileWriter.open(executor, clientStorage.resolve(DOWNLOADED_FILE)))          .whenResult(() -> System.out.printf("%nFile '%s' successfully downloaded to '%s'%n%n",              REQUIRED_FILE, clientStorage))  );  try {    future.get();  } finally {    executor.shutdown();  }}

在GitHub上看到完整的例子

ActiveFs装饰器

有时你可能需要覆盖/扩展 ActiveFs 实现的默认行为。 要做到这一点,你可以利用 Decorator 模式。 DecoratedActiveFsExample 展示了如何做到这一点。 它对 ActiveFs 实现进行了装饰,为文件上传和下载增加了额外的日志记录。 DecoratedActiveFsExample 扩展了 ServerSetupExample 所以它继承了它的DI绑定。 首先,我们需要覆盖 ActiveFsServer 的绑定,以传递装饰过的 ActiveFs ,而不是原来的那个。 为此,我们将覆盖 Launcher#getOverrideModule 方法,并为 AsyncFsServer 提供一个新的绑定,使用装饰的 ActiveFs

@Overrideprotected Module getOverrideModule() {  return new AbstractModule() {    @Eager    @Provides    ActiveFsServer activeFsServer(Eventloop eventloop, @Named("decorated") ActiveFs decoratedFs, Config config) {      return ActiveFsServer.create(eventloop, decoratedFs)          .withInitializer(ofActiveFsServer(config.getChild("activefs")));    }
    @Provides    @Named("decorated")    ActiveFs decoratedActiveFs(ActiveFs fs) {      return new LoggingActiveFs(fs);    }  };}

正如你所看到的,在 decoratedActiveFs(ActiveFs fs) 方法中,我们请求原始的 ActiveFs ,并返回经过装饰的 (被包裹在 LoggingActiveFs)。

private static final class LoggingActiveFs extends ForwardingActiveFs {  private static final Logger logger = LoggerFactory.getLogger(LoggingActiveFs.class);
  public LoggingActiveFs(ActiveFs peer) {    super(peer);  }
  @Override  public Promise<ChannelConsumer<ByteBuf>> upload(@NotNull String name, long size) {    return super.upload(name)        .map(consumer -> {          logger.info("Starting upload of file: {}. File size is {} bytes", name, size);          return consumer              .withAcknowledgement(ack -> ack                  .whenResult(() -> logger.info("Upload of file {} finished", name)));        });  }
  @Override  public Promise<ChannelSupplier<ByteBuf>> download(@NotNull String name, long offset, long limit) {    return super.download(name, offset, limit)        .map(supplier -> {          logger.info("Starting downloading file: {}", name);          return supplier              .withEndOfStream(eos -> eos                  .whenResult(() -> logger.info("Download of file {} finished", name)));        });
  }}

LoggingActiveFs 扩展了 ForwardingActiveFs ,它只是将所有的 ActiveFs 方法调用委托给一些底层 ActiveFs 实例。 我们覆盖我们想要装饰的方法(下载, 上传)并在 上传/下载开始和结束时添加自定义日志信息。 你可以运行 FileUploadExample ,然后是 FileDownloadExample 之后你应该看到日志输出。

INFO 开始上传文件:example.txt。 文件大小为12字节INFO 上传文件example.txt完成INFO 开始下载文件: example.txtINFO 下载文件example.txt完成

在GitHub上看到完整的例子

集群文件存储

通过ActiveJ FS,你可以简单地创建具有高容错性的分布式集群文件存储。 我们将使用Docker来启动三个虚拟服务器和一个客户端。 存储将支持文件上传,并根据所提供的规则和复制数量进行自动重新分区。

我们需要做的第一件事是为我们的服务器创建一个启动器类 ClusterTcpServerLauncher。 延长 SimpleTcpServerLauncher 来获得所有需要的实例。 ActiveFsServer,当地 "ActiveFS, AsyncHttpServer 为GUI,它将简化与你的存储和其他辅助实例的工作。 在 ClusterTcpServerLauncher ,我们只需要设置任务调度器等重新分区管理的实用程序。 ClusterRepartitionController,以及 FsPartitions 用于跟踪活着的分区和它们的状态。 分区将通过TCP协议进行通信,而GUI服务器将使用HTTP。

@Provides@Eager@Named("repartition")EventloopTaskScheduler repartitionScheduler(Config config, ClusterRepartitionController controller) {  return EventloopTaskScheduler.create(controller.getEventloop(), controller::repartition)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition")));}
@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@ProvidesClusterRepartitionController repartitionController(Config config, ActiveFsServer localServer, FsPartitions partitions) {  String localPartitionId = first(partitions.getAllPartitions());  assert localPartitionId != null;
  return ClusterRepartitionController.create(localPartitionId, partitions)      .withInitializer(ofClusterRepartitionController(config.getChild("activefs.repartition")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, ActiveFs activeFs, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, activeFs, config);}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService, OptionalDependency<ServerSelector> serverSelector) {  return FsPartitions.create(eventloop, discoveryService)      .withServerSelector(serverSelector.orElse(RENDEZVOUS_HASH_SHARDER));}

现在我们可以继续创建一个客户端启动器 ClusterTcpClientLauncher。 我们需要提供 ClusterRepartitionController 和一个任务调度器来检测死分区。 与服务器启动器类似,我们需要为GUI提供一个 AsyncHttpServer ,为管理分区提供 FsPartitions。 我们还需要一个 ClusterActiveFs 类的实例,这是一个 ActiveFs 实现,作为一个集群在其他分区上运行,并包含一些冗余和故障安全能力。

@Provides@Eager@Named("clusterDeadCheck")EventloopTaskScheduler deadCheckScheduler(Config config, FsPartitions partitions) {  return EventloopTaskScheduler.create(partitions.getEventloop(), partitions::checkDeadPartitions)      .withInitializer(ofEventloopTaskScheduler(config.getChild("activefs.repartition.deadCheck")));}
@Provides@EagerAsyncHttpServer guiServer(Eventloop eventloop, AsyncServlet servlet, Config config) {  return AsyncHttpServer.create(eventloop, servlet)      .withInitializer(ofHttpServer(config.getChild("activefs.http.gui")));}
@ProvidesAsyncServlet guiServlet(ActiveFs activeFs) {  return ActiveFsGuiServlet.create(activeFs, "Cluster FS Client");}
@ProvidesActiveFs remoteActiveFs(Eventloop eventloop, FsPartitions partitions, Config config) {  return ClusterActiveFs.create(partitions)      .withInitializer(ofClusterActiveFs(config.getChild("activefs.cluster")));}
@ProvidesDiscoveryService discoveryService(Eventloop eventloop, Config config) throws MalformedDataException {  return Initializers.constantDiscoveryService(eventloop, config.getChild("activefs.cluster"));}
@ProvidesFsPartitions fsPartitions(Eventloop eventloop, DiscoveryService discoveryService) {  return FsPartitions.create(eventloop, discoveryService);}

下面是我们的分布式P2P存储的架构。

Distributed P2P storageDistributed P2P storage

你可以根据自己的意愿创建任意多的分区。

为了启动这个例子,运行以下脚本来创建Docker镜像和构建容器(在 activej/launchers/fs 目录下运行所有脚本)。

# 为服务器和客户端构建两个镜像docker build -t cluster-server -f ClusterServerDockerfile .docker build -t cluster-client -f ClusterClientDockerfile .# 在后台启动所有服务器和客户端实例docker-compose up -d

容器将以下列配置建造。

  • Server1: TCP连接端口 9001, HTTP GUI端口 8081
  • Server2: TCP连接端口 9002, HTTP GUI端口 8082
  • Server3: TCP连接端口 9003, HTTP GUI端口 8083
  • 客户端: HTTP GUI端口 8080

使用这个脚本来管理容器。

# 停止一个容器:docker-compose stop server1# 停止所有容器:docker-compose down# 检查容器状态:docker-compose ps

在GitHub上看到完整的例子