Skip to main content

Net

Overview

A convenient and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny layer of abstraction on top of reactor (Eventloop) and Java NIO Adapters for TCP socket along with UDP socket

Features

  • Support of Promises for read and write operations
  • Compatibility with CSP ChannelSupplier and ChannelConsumer TcpSocket can operate as a CSP channel with built-in backpressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (such as buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
  • Significantly optimized and with almost no performance overhead, makes extensive use of ByteBufPool

Reactive sockets

Fully reactive TCP socket with TLS support. Allows you to send/receive data to/from the network. It can be used as a building block for creating custom TCP servers/clients or implementing custom networking protocols. Socket has a simple and intuitive API consisting of read/write methods. The CSP module can be used to wrap a socket into a ChannelSupplier or ChannelConsumer

There is also an asynchronous UDP socket for UDP communications.

Server

The AbstractReactiveServer class is the basis for building reactor-based TCP servers (HTTP servers, RPC servers, TCP file services, etc.):

A ready-to-use PrimaryServer implementation that works in primary reactor as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers, which then execute the actual “accept” requests in their corresponding worker reactor threads.

Examples

note

To run the examples, you need to clone ActiveJ from GitHub

git clone https://github.com/activej/activej

And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project. These examples are located at activej/examples/core/net

Ping-Pong Socket Connection

In this example we will use the AbstractReactiveServer implementation, SimpleServer, which receives a message and sends a response (PONG). We will also use TCP socket as a client to send 3 request messages (PING).

public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();

SimpleServer server = SimpleServer.builder(
eventloop,
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce()
.build();

server.listen();

TcpSocket.connect(eventloop, ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> {throw new RuntimeException(e);});

eventloop.run();
}

See full example on GitHub

CSP TCP Client

A simple TCP console client that connects to a TCP server:

private void run() {
System.out.println("Connecting to server at localhost (port 9922)...");
eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
if (e == null) {
System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(getCurrentReactor(), socketChannel, null);
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}

BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.streamTo(ChannelConsumers.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));

startCommandLineInterface(socket);
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}

public static void main(String[] args) {
new TcpClientExample().run();
}

It sends characters, receives some data back through the CSP channel, parses it and then outputs it to the console.

See full example on GitHub

CSP TCP Server

A simple TCP echo server running in an eventloop:

public static void main(String[] args) throws Exception {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();

SimpleServer server = SimpleServer.builder(eventloop, socket ->
BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
.map(buf -> {
ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
return ByteBufPool.append(serverBuf, buf);
})
.map(buf -> ByteBufPool.append(buf, CRLF))
.streamTo(ChannelConsumers.ofSocket(socket)))
.withListenPort(PORT)
.build();

server.listen();

System.out.println("Server is running");
System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");

eventloop.run();
}

This server listens for connections, and when a client connects, it parses its message and sends it back as a CSP channel via socket.

See full example on GitHub

Datastream TCP Client

graph TB id3-.->id4 subgraph Client id1(produce ints)-->id2(serialize them into bytes) id2-->id3(send bytes over the network) id9(receive those bytes from network)-->id10(deserialize bytes into strings) id10 --> id11(collect those strings in a list) end id8-.->id9 subgraph Server id4(receive bytes from network)-->id5(deserialize bytes into ints) id5-->id6(compute strings from those ints somehow) id6-->id7(serialize strings into bytes) id7-->id8(send those bytes over the network) end

This image illustrates the communication and transformations between TCP client and a server that use Datastream to process data.

public final class TcpClientExample {
public static final int PORT = 9922;

public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withFatalErrorHandler(rethrow())
.build();

eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
if (e == null) {
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(eventloop, socketChannel, null);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}

StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformWith(ChannelSerializer.create(INT_SERIALIZER))
.streamTo(ChannelConsumers.ofSocket(socket));

ChannelSuppliers.ofSocket(socket)
.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
.toList()
.whenResult(list -> list.forEach(System.out::println));
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});

eventloop.run();
}
}

See full example on GitHub

Datastream TCP Server

This server represents Server from the above illustration:

public final class TcpServerExample {

public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create();

InetSocketAddress address = new InetSocketAddress("localhost", TcpClientExample.PORT);
ServerSocketSettings socketSettings = ServerSocketSettings.defaultInstance();
eventloop.listen(address, socketSettings,
channel -> {
ITcpSocket socket;

try {
socket = TcpSocket.wrapChannel(eventloop, channel, null);
System.out.println("Client connected: " + channel.getRemoteAddress());
} catch (IOException e) {
throw new RuntimeException(e);
}

ChannelSuppliers.ofSocket(socket)
.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
.transformWith(StreamTransformers.mapper(x -> x + " times 10 = " + x * 10))
.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
.streamTo(ChannelConsumers.ofSocket(socket));
});

System.out.println("Connect to the server by running datastream.TcpClientExample");

eventloop.run();
}
}

See full example on GitHub