Перейти к основному содержанию

Net

Обзор

Удобная и эффективная альтернатива Netty, Apache Mina и другим подобным решениям для высокопроизводительных сетей. Маленький слой абстракции поверх Eventloop и Java NIO Adapters для AsyncTcpSocket вместе с AsyncUdpSocket

Характеристики

  • Поддержка Promises для операций чтения и записи
  • Совместимость с CSP ChannelSupplier и ChannelConsumer AsyncTcpSocket может работать как CSP-канал со встроенным распространением обратного давления, и может быть подключен к CSP/Datastream конвейеру со всеми его возможностями (такими как буферизация, сжатие, сериализация/десериализация, преобразование данных, фильтрация данных, сокращение и т.д.).
  • Экстенсивно оптимизирован и практически не имеет накладных расходов на производительность, использует ByteBufPool широко.

Асинхронные сокеты

Полностью асинхронный TCP сокет с поддержкой TLS. Позволяет отправлять/получать данные в/из сети. Может использоваться как строительный блок для создания пользовательских TCP-серверов/клиентов или реализации пользовательских сетевых протоколов. Socket имеет чрезвычайно простой и интуитивно понятный API, который состоит из методов read/write . CSP модуль может быть использован для обертывания сокета в ChannelSupplier или ChannelConsumer

Существует также асинхронный UDP сокет для UDP-коммуникаций.

Сервер

Сайт AbstractServer класс служит основой для построения TCP-серверов с поддержкой Eventloop (HTTP-серверы, RPC-серверы, файловые службы TCP и т.д.):

  • Поддержка семантики start/stop
  • Реализует EventloopServer
  • Реализует WorkerServer интерфейс, поэтому все подклассы AbstractServer могут быть легко использованы в качестве рабочих серверов.
  • Поддержка ServerSocketSettings и SocketSettings

Готовые к использованию PrimaryServer реализация, которая работает в первичных Eventloops в качестве балансировщика. Он принимает внешние запросы "принять" и перераспределяет их рабочим серверам, которые затем выполняют фактические запросы "принять" в соответствующих рабочих потоках Eventloop.

Примеры

note

Чтобы запустить примеры, необходимо клонировать ActiveJ с GitHub

git clone https://github.com/activej/activej

И импортируйте его как проект Maven. Посмотрите тег v5.4.3. Перед запуском примеров выполните сборку проекта. Эти примеры расположены по адресу activej/examples/core/net.

Подключение розетки пинг-понг

В этом примере мы используем реализацию AbstractServer-... SimpleServer который получает сообщение и отправляет ответ (PONG). Мы также будем использовать AsyncTcpSocketNio в качестве клиента, чтобы отправить 3 сообщения запроса (PING).

public static void main(String[] args) throws IOException {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(      socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        repeat(() ->            bufsSupplier.decode(DECODER)                .whenResult(x -> System.out.println(x))                .then(() -> socket.write(wrapAscii(RESPONSE_MSG)))                .map($ -> true))            .whenComplete(socket::close);      })      .withListenAddress(ADDRESS)      .withAcceptOnce();
  server.listen();
  AsyncTcpSocketNio.connect(ADDRESS)      .whenResult(socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        loop(0,            i -> i < ITERATIONS,            i -> socket.write(wrapAscii(REQUEST_MSG))                .then(() -> bufsSupplier.decode(DECODER)                    .whenResult(x -> System.out.println(x))                    .map($2 -> i + 1)))            .whenComplete(socket::close);      })      .whenException(e -> { throw new RuntimeException(e); });
  eventloop.run();}

Полный текст примера смотрите на GitHub.

Клиент CSP TCP

Простой консольный клиент TCP, который подключается к серверу TCP:

private void run() {  System.out.println("Connecting to server at localhost (port 9922)...");  eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {    if (e == null) {      System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");      AsyncTcpSocket socket;      try {        socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);      } catch (IOException ioException) {        throw new RuntimeException(ioException);      }
      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
      startCommandLineInterface(socket);    } else {      System.out.printf("Could not connect to server, make sure it is started: %s%n", e);    }  });  eventloop.run();}
public static void main(String[] args) {  new TcpClientExample().run();}

Он отправляет символы, получает некоторые данные обратно через канал CSP, анализирует их и затем выводит на консоль. Полный текст примера смотрите на GitHub.

Сервер CSP TCP

Простой TCP эхо-сервер, работающий в режиме eventloop:

public static void main(String[] args) throws Exception {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(socket ->      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))          .map(buf -> {            ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");            return ByteBufPool.append(serverBuf, buf);          })          .map(buf -> ByteBufPool.append(buf, CRLF))          .streamTo(ChannelConsumer.ofSocket(socket)))      .withListenPort(PORT);
  server.listen();
  System.out.println("Server is running");  System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
  eventloop.run();}

Этот сервер прослушивает соединения, и когда клиент подключается, он разбирает его сообщение и отправляет его обратно как канал CSP через сокет. Полный текст примера смотрите на GitHub.

Клиент Datastream TCP

graph TB id3-.->id4 подграф Client id1(производить инты)-->id2(сериализовать их в байты) id2-->id3(отправлять байты по сети) id9(получать байты из сети)-->id10(десериализовать байты в строки) id10 --> id11(собирать строки в список) end id8-.->id9 subgraph Server id4(получать байты из сети)-->id5(десериализовать байты в ints) id5-->id6(каким-то образом вычислить строки из этих ints) id6-->id7(сериализовать строки в байты) id7-->id8(отправить эти байты по сети) end

Это изображение иллюстрирует взаимодействие и преобразования между TCP-клиентом и сервером, использующим Datastream для обработки данных.

public final class TcpClientExample {  public static final int PORT = 9922;
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withEventloopFatalErrorHandler(rethrow());
    eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {      if (e == null) {        AsyncTcpSocket socket;        try {          socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);        } catch (IOException ioEx) {          throw new RuntimeException(ioEx);        }
        StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)            .transformWith(ChannelSerializer.create(INT_SERIALIZER))            .streamTo(ChannelConsumer.ofSocket(socket));
        StreamConsumerToList<String> consumer = StreamConsumerToList.create();
        ChannelSupplier.ofSocket(socket)            .transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))            .streamTo(consumer);
        consumer.getResult()            .whenResult(list -> list.forEach(System.out::println));
      } else {        System.out.printf("Could not connect to server, make sure it is started: %s%n", e);      }    });
    eventloop.run();  }}

Полный текст примера смотрите на GitHub.

Datastream TCP Server

Этот сервер представляет собой Сервер из приведенной выше иллюстрации:

public final class TcpServerExample {
  public static void main(String[] args) throws IOException {    Eventloop eventloop = Eventloop.create();
    eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {      AsyncTcpSocket socket;
      try {        socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);        System.out.println("Client connected: " + channel.getRemoteAddress());      } catch (IOException e) {        throw new RuntimeException(e);      }
      ChannelSupplier.ofSocket(socket)          .transformWith(ChannelDeserializer.create(INT_SERIALIZER))          .transformWith(StreamFilter.mapper(x -> x + " times 10 = " + x * 10))          .transformWith(ChannelSerializer.create(UTF8_SERIALIZER))          .streamTo(ChannelConsumer.ofSocket(socket));    });
    System.out.println("Connect to the server by running datastream.TcpClientExample");
    eventloop.run();  }}

Полный текст примера смотрите на GitHub.