Skip to main content

Net

Overview#

Handy and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny abstraction layer on top of Eventloop and Java NIO Adapters for AsyncTcpSocket along with AsyncUdpSocket

Features#

  • Support of Promises for read and write operations
  • Compatibility with CSP ChannelSupplier and ChannelConsumer AsyncTcpSocket can work as a CSP channel with built-in back pressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (like buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
  • Extensively optimized and has almost no performance overhead, uses ByteBufPool widely

Asynchronous sockets#

A fully asynchronous TCP socket with TLS support. Allows sending/receiving data to/from the network. Can be used as a building block for creating custom TCP servers/clients or implementing custom networking protocols. Socket has an extremely simple and intuitive API that consists of read/write methods. CSP module can be used to wrap socket into ChannelSupplier or ChannelConsumer

There is also an asynchronous UDP socket for UDP communications.

Server#

The AbstractServer class serves as a foundation for building Eventloop-aware TCP servers (HTTP servers, RPC servers, TCP file services, etc.):

Ready-to-use PrimaryServer implementation which works in primary Eventloops as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers that will then execute the actual “accept” requests in their corresponding worker Eventloop threads.

Examples#

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v4.3. Before running the examples, build the project. These examples are located at activej/examples/core/net

Ping-Pong Socket Connection#

In this example we are using an implementation of AbstractServer - SimpleServer which receives a message and sends a response (PONG). We will also use AsyncTcpSocketNio as a client to send 3 request messages (PING).

public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create().withCurrentThread();
SimpleServer server = SimpleServer.create(
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce();
server.listen();
AsyncTcpSocketNio.connect(ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> { throw new RuntimeException(e); });
eventloop.run();
}

See full example on GitHub

CSP TCP Client#

A simple TCP console client which connects to TCP server:

private void run() {
System.out.println("Connecting to server at localhost (port 9922)...");
eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
if (e == null) {
System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
.decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
startCommandLineInterface(socket);
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
public static void main(String[] args) {
new TcpClientExample().run();
}

It sends characters, receives some data back through CSP channel, parses it and then prints out to console.

See full example on GitHub

CSP TCP Server#

Simple TCP echo server which runs in an eventloop:

public static void main(String[] args) throws Exception {
Eventloop eventloop = Eventloop.create().withCurrentThread();
SimpleServer server = SimpleServer.create(socket ->
BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
.decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
.map(buf -> {
ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
return ByteBufPool.append(serverBuf, buf);
})
.map(buf -> ByteBufPool.append(buf, CRLF))
.streamTo(ChannelConsumer.ofSocket(socket)))
.withListenPort(PORT);
server.listen();
System.out.println("Server is running");
System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
eventloop.run();
}

This server listens for connections and when client connects, it parses its message and sends it back as CSP channel via socket.

See full example on GitHub

Datastream TCP Client#

graph TB id3-.->id4 subgraph Client id1(produce ints)-->id2(serialize them into bytes) id2-->id3(send bytes over the network) id9(receive those bytes from network)-->id10(deserialize bytes into strings) id10 --> id11(collect those strings in a list) end id8-.->id9 subgraph Server id4(receive bytes from network)-->id5(deserialize bytes into ints) id5-->id6(compute strings from those ints somehow) id6-->id7(serialize strings into bytes) id7-->id8(send those bytes over the network) end

This image illustrates communication and transformations between TCP client and server that use Datastream for data processing.

public final class TcpClientExample {
public static final int PORT = 9922;
public static void main(String[] args) {
Eventloop eventloop = Eventloop.create().withFatalErrorHandler(rethrowOnAnyError());
eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
if (e == null) {
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformWith(ChannelSerializer.create(INT_SERIALIZER))
.streamTo(ChannelConsumer.ofSocket(socket));
StreamConsumerToList<String> consumer = StreamConsumerToList.create();
ChannelSupplier.ofSocket(socket)
.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
.streamTo(consumer);
consumer.getResult()
.whenResult(list -> list.forEach(System.out::println));
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
}

See full example on GitHub

Datastream TCP Server#

This server represents Server from the above illustration:

public final class TcpServerExample {
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create();
eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);
System.out.println("Client connected: " + channel.getRemoteAddress());
} catch (IOException e) {
throw new RuntimeException(e);
}
ChannelSupplier.ofSocket(socket)
.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
.transformWith(StreamFilter.mapper(x -> x + " times 10 = " + x * 10))
.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
.streamTo(ChannelConsumer.ofSocket(socket));
});
System.out.println("Connect to the server by running datastream.TcpClientExample");
eventloop.run();
}
}

See full example on GitHub