跳到主要内容

净值

概述

是Netty、Apache Mina和其他类似高性能网络解决方案的便捷、高效的替代方案。 在 Eventloop 和Java NIO Adapters之上的一个微小的抽象层,用于 "AsyncTcpSocket",即 "AsyncTcpSocket"。 连同 "AsyncUdpSocket"(同步UdpSocket)。

特点

  • 支持 Promises ,用于读和写操作。
  • CSP相容。 渠道供应商渠道消费者"。 AsyncTcpSocket可以作为具有内置背压传播的CSP通道工作,并且可以插入 CSP/Datastream 管道及其所有功能(如缓冲、压缩、序列化/反序列化、数据转换、数据过滤、还原等等)。
  • 广泛优化,几乎没有性能开销,使用 ByteBufPool 广泛使用

异步套接字

一个完全异步的 TCP套接字 具有TLS支持。 允许向/从网络发送/接收数据。 可作为创建自定义TCP服务器/客户端的构建模块 ,或实现自定义网络协议。 Socket有一个极其简单和直观的API,由 / 方法组成。 CSP 模块可用于将插座包成 渠道供应商渠道消费者"。

还有一个异步的 UDP套接字 用于UDP通信。

服务器

ǞǞǞ AbstractServer(抽象服务器) 类可作为构建Eventloop感知的TCP服务器(HTTP服务器、RPC服务器、TCP文件服务等)的基础。

即用型 主服务器"。 实现,在主要的Eventloops中作为平衡器工作。 它接收外部 "接受 "请求,并将其重新分配给WorkerServers,然后在其相应的Worker Eventloop线程中执行实际的 "接受 "请求。

实例

note

要运行例子,你需要从GitHub克隆ActiveJ

git clone https://github.com/activej/activej

并将其作为一个Maven项目导入。 查看标签 v5.0。 在运行这些例子之前,先建立项目。 这些例子位于 activej/examples/core/net。

乒乓插座连接

在这个例子中,我们使用的是 AbstractServer 的一个实现。 简易服务器 它接收一个信息并发送一个响应(PONG)。 我们还将使用 "AsyncTcpSocketNio"。 作为客户端发送3条请求信息(PING)。

public static void main(String[] args) throws IOException {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(      socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        repeat(() ->            bufsSupplier.decode(DECODER)                .whenResult(x -> System.out.println(x))                .then(() -> socket.write(wrapAscii(RESPONSE_MSG)))                .map($ -> true))            .whenComplete(socket::close);      })      .withListenAddress(ADDRESS)      .withAcceptOnce();
  server.listen();
  AsyncTcpSocketNio.connect(ADDRESS)      .whenResult(socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        loop(0,            i -> i < ITERATIONS,            i -> socket.write(wrapAscii(REQUEST_MSG))                .then(() -> bufsSupplier.decode(DECODER)                    .whenResult(x -> System.out.println(x))                    .map($2 -> i + 1)))            .whenComplete(socket::close);      })      .whenException(e -> { throw new RuntimeException(e); });
  eventloop.run();}

在GitHub上看到完整的例子

CSP TCP 客户端

一个简单的TCP控制台客户端,连接到TCP服务器。

private void run() {  System.out.println("Connecting to server at localhost (port 9922)...");  eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {    if (e == null) {      System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");      AsyncTcpSocket socket;      try {        socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);      } catch (IOException ioException) {        throw new RuntimeException(ioException);      }
      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
      startCommandLineInterface(socket);    } else {      System.out.printf("Could not connect to server, make sure it is started: %s%n", e);    }  });  eventloop.run();}
public static void main(String[] args) {  new TcpClientExample().run();}

它发送字符,通过CSP通道接收一些数据回来,对其进行解析,然后打印出来到控制台。 在GitHub上看到完整的例子

CSP TCP服务器

简单的TCP回声服务器,在一个事件循环中运行。

public static void main(String[] args) throws Exception {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(socket ->      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))          .map(buf -> {            ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");            return ByteBufPool.append(serverBuf, buf);          })          .map(buf -> ByteBufPool.append(buf, CRLF))          .streamTo(ChannelConsumer.ofSocket(socket)))      .withListenPort(PORT);
  server.listen();
  System.out.println("Server is running");  System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
  eventloop.run();}

这个服务器监听连接,当客户端连接时,它解析其信息并通过套接字将其作为CSP通道发送回来。 在GitHub上看到完整的例子

Datastream TCP客户端

graph TB id3-.->id4 subgraph Client id1(produce ints)-->id2(serialize them into bytes) id2-->id3(send bytes over network) id9(receive those bytes from network)-->id10(deserialize bytes into strings) id10--> id11(collect those strings in a list) end id8-.->id9 subgraph Server id4(从网络接收字节)-->id5(将字节反序列化为整数) id5-->id6(从这些整数计算字符串) id6-->id7(将字符串序列化为字节) id7-->id8(通过网络发送这些字节) end

该图片说明了使用 Datastream 进行数据处理的TCP客户和服务器之间的通信和转换。

public final class TcpClientExample {  public static final int PORT = 9922;
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withEventloopFatalErrorHandler(rethrow());
    eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {      if (e == null) {        AsyncTcpSocket socket;        try {          socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);        } catch (IOException ioEx) {          throw new RuntimeException(ioEx);        }
        StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)            .transformWith(ChannelSerializer.create(INT_SERIALIZER))            .streamTo(ChannelConsumer.ofSocket(socket));
        StreamConsumerToList<String> consumer = StreamConsumerToList.create();
        ChannelSupplier.ofSocket(socket)            .transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))            .streamTo(consumer);
        consumer.getResult()            .whenResult(list -> list.forEach(System.out::println));
      } else {        System.out.printf("Could not connect to server, make sure it is started: %s%n", e);      }    });
    eventloop.run();  }}

在GitHub上看到完整的例子

Datastream TCP服务器

这个服务器代表了上图中的 服务器

public final class TcpServerExample {
  public static void main(String[] args) throws IOException {    Eventloop eventloop = Eventloop.create();
    eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {      AsyncTcpSocket socket;
      try {        socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);        System.out.println("Client connected: " + channel.getRemoteAddress());      } catch (IOException e) {        throw new RuntimeException(e);      }
      ChannelSupplier.ofSocket(socket)          .transformWith(ChannelDeserializer.create(INT_SERIALIZER))          .transformWith(StreamFilter.mapper(x -> x + " times 10 = " + x * 10))          .transformWith(ChannelSerializer.create(UTF8_SERIALIZER))          .streamTo(ChannelConsumer.ofSocket(socket));    });
    System.out.println("Connect to the server by running datastream.TcpClientExample");
    eventloop.run();  }}

在GitHub上看到完整的例子