Skip to main content

Net

Overview

A convenient and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny layer of abstraction on top of Eventloop and Java NIO Adapters for AsyncTcpSocket along with AsyncUdpSocket

Features

  • Support of Promises for read and write operations
  • Compatibility with CSP ChannelSupplier and ChannelConsumer AsyncTcpSocket can operate as a CSP channel with built-in backpressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (such as buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
  • Significantly optimized and with almost no performance overhead, makes extensive use of ByteBufPool

Asynchronous sockets

Fully asynchronous TCP socket with TLS support. Allows you to send/receive data to/from the network. It can be used as a building block for creating custom TCP servers/clients or implementing custom networking protocols. Socket has a simple and intuitive API consisting of read/write methods. The CSP module can be used to wrap a socket into a ChannelSupplier or ChannelConsumer

There is also an asynchronous UDP socket for UDP communications.

Server

The AbstractServer class is the basis for building Eventloop-based TCP servers (HTTP servers, RPC servers, TCP file services, etc.):

A ready-to-use PrimaryServer implementation that works in primary Eventloops as a balancer. It takes external โ€œacceptโ€ requests and redistributes them to the WorkerServers, which then execute the actual โ€œacceptโ€ requests in their corresponding worker Eventloop threads.

Examples

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v5.3. Before running the examples, build the project. These examples are located at activej/examples/core/net

Ping-Pong Socket Connection

In this example we will use the AbstractServer implementation, SimpleServer, which receives a message and sends a response (PONG). We will also use AsyncTcpSocketNio as a client to send 3 request messages (PING).

public static void main(String[] args) throws IOException {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(      socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        repeat(() ->            bufsSupplier.decode(DECODER)                .whenResult(x -> System.out.println(x))                .then(() -> socket.write(wrapAscii(RESPONSE_MSG)))                .map($ -> true))            .whenComplete(socket::close);      })      .withListenAddress(ADDRESS)      .withAcceptOnce();
  server.listen();
  AsyncTcpSocketNio.connect(ADDRESS)      .whenResult(socket -> {        BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));        loop(0,            i -> i < ITERATIONS,            i -> socket.write(wrapAscii(REQUEST_MSG))                .then(() -> bufsSupplier.decode(DECODER)                    .whenResult(x -> System.out.println(x))                    .map($2 -> i + 1)))            .whenComplete(socket::close);      })      .whenException(e -> { throw new RuntimeException(e); });
  eventloop.run();}

See full example on GitHub

CSP TCP Client

A simple TCP console client that connects to a TCP server:

private void run() {  System.out.println("Connecting to server at localhost (port 9922)...");  eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {    if (e == null) {      System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");      AsyncTcpSocket socket;      try {        socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);      } catch (IOException ioException) {        throw new RuntimeException(ioException);      }
      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
      startCommandLineInterface(socket);    } else {      System.out.printf("Could not connect to server, make sure it is started: %s%n", e);    }  });  eventloop.run();}
public static void main(String[] args) {  new TcpClientExample().run();}

It sends characters, receives some data back through the CSP channel, parses it and then outputs it to the console.

See full example on GitHub

CSP TCP Server

A simple TCP echo server running in an eventloop:

public static void main(String[] args) throws Exception {  Eventloop eventloop = Eventloop.create().withCurrentThread();
  SimpleServer server = SimpleServer.create(socket ->      BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))          .decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())          .peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))          .map(buf -> {            ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");            return ByteBufPool.append(serverBuf, buf);          })          .map(buf -> ByteBufPool.append(buf, CRLF))          .streamTo(ChannelConsumer.ofSocket(socket)))      .withListenPort(PORT);
  server.listen();
  System.out.println("Server is running");  System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
  eventloop.run();}

This server listens for connections, and when a client connects, it parses its message and sends it back as a CSP channel via socket.

See full example on GitHub

Datastream TCP Client

graph TB id3-.->id4 subgraph Client id1(produce ints)-->id2(serialize them into bytes) id2-->id3(send bytes over the network) id9(receive those bytes from network)-->id10(deserialize bytes into strings) id10 --> id11(collect those strings in a list) end id8-.->id9 subgraph Server id4(receive bytes from network)-->id5(deserialize bytes into ints) id5-->id6(compute strings from those ints somehow) id6-->id7(serialize strings into bytes) id7-->id8(send those bytes over the network) end

This image illustrates the communication and transformations between TCP client and a server that use Datastream to process data.

public final class TcpClientExample {  public static final int PORT = 9922;
  public static void main(String[] args) {    Eventloop eventloop = Eventloop.create().withEventloopFatalErrorHandler(rethrow());
    eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {      if (e == null) {        AsyncTcpSocket socket;        try {          socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);        } catch (IOException ioEx) {          throw new RuntimeException(ioEx);        }
        StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)            .transformWith(ChannelSerializer.create(INT_SERIALIZER))            .streamTo(ChannelConsumer.ofSocket(socket));
        StreamConsumerToList<String> consumer = StreamConsumerToList.create();
        ChannelSupplier.ofSocket(socket)            .transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))            .streamTo(consumer);
        consumer.getResult()            .whenResult(list -> list.forEach(System.out::println));
      } else {        System.out.printf("Could not connect to server, make sure it is started: %s%n", e);      }    });
    eventloop.run();  }}

See full example on GitHub

Datastream TCP Server

This server represents Server from the above illustration:

public final class TcpServerExample {
  public static void main(String[] args) throws IOException {    Eventloop eventloop = Eventloop.create();
    eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {      AsyncTcpSocket socket;
      try {        socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);        System.out.println("Client connected: " + channel.getRemoteAddress());      } catch (IOException e) {        throw new RuntimeException(e);      }
      ChannelSupplier.ofSocket(socket)          .transformWith(ChannelDeserializer.create(INT_SERIALIZER))          .transformWith(StreamFilter.mapper(x -> x + " times 10 = " + x * 10))          .transformWith(ChannelSerializer.create(UTF8_SERIALIZER))          .streamTo(ChannelConsumer.ofSocket(socket));    });
    System.out.println("Connect to the server by running datastream.TcpClientExample");
    eventloop.run();  }}

See full example on GitHub