Net
Overview
A convenient and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny layer of abstraction on top of reactor (Eventloop
) and Java NIO Adapters for TCP socket
along with UDP socket
Features
- Support of
Promises
for read and write operations - Compatibility with
CSP
ChannelSupplier
andChannelConsumer
TcpSocket can operate as a CSP channel with built-in backpressure propagation, and can be plugged intoCSP
/Datastream
pipeline with all its features (such as buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.) - Significantly optimized and with almost no performance overhead, makes extensive use of
ByteBufPool
Reactive sockets
Fully reactive TCP socket
with TLS support. Allows you to send/receive data to/from the network. It can be used as a building block for creating custom TCP servers/clients
or implementing custom networking protocols. Socket has a simple and intuitive API consisting of read
/write
methods.
The CSP module can be used to wrap a socket into a ChannelSupplier
or ChannelConsumer
There is also an asynchronous UDP socket
for UDP communications.
Server
The AbstractReactiveServer
class is the basis for building reactor-based TCP servers (HTTP servers, RPC servers, TCP file services, etc.):
- Support of start/stop semantics
- Implements
ReactiveServer
- Implements
WorkerServer
interface, so allAbstractReactiveServer
subclasses can be used as worker servers right away - Support of
ServerSocketSettings
andSocketSettings
A ready-to-use PrimaryServer
implementation that works in primary reactor as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers, which then execute the actual “accept” requests in their corresponding worker reactor threads.
Examples
To run the examples, you need to clone ActiveJ from GitHub
git clone https://github.com/activej/activej
And import it as a Maven project. Check out tag v6.0-beta2. Before running the examples, build the project.
These examples are located at activej/examples/core/net
Ping-Pong Socket Connection
In this example we will use the AbstractReactiveServer
implementation, SimpleServer
, which receives a message and sends a response (PONG
). We will also use TCP socket
as a client to send 3 request messages (PING
).
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
SimpleServer server = SimpleServer.builder(
eventloop,
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce()
.build();
server.listen();
TcpSocket.connect(eventloop, ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> {throw new RuntimeException(e);});
eventloop.run();
}
CSP TCP Client
A simple TCP console client that connects to a TCP server:
private void run() {
System.out.println("Connecting to server at localhost (port 9922)...");
eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
if (e == null) {
System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(getCurrentReactor(), socketChannel, null);
} catch (IOException ioException) {
throw new RuntimeException(ioException);
}
BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.streamTo(ChannelConsumers.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
startCommandLineInterface(socket);
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
public static void main(String[] args) {
new TcpClientExample().run();
}
It sends characters, receives some data back through the CSP channel, parses it and then outputs it to the console.
CSP TCP Server
A simple TCP echo server running in an eventloop:
public static void main(String[] args) throws Exception {
Eventloop eventloop = Eventloop.builder()
.withCurrentThread()
.build();
SimpleServer server = SimpleServer.builder(eventloop, socket ->
BinaryChannelSupplier.of(ChannelSuppliers.ofSocket(socket))
.decodeStream(ByteBufsDecoders.ofCrlfTerminatedBytes())
.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
.map(buf -> {
ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
return ByteBufPool.append(serverBuf, buf);
})
.map(buf -> ByteBufPool.append(buf, CRLF))
.streamTo(ChannelConsumers.ofSocket(socket)))
.withListenPort(PORT)
.build();
server.listen();
System.out.println("Server is running");
System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
eventloop.run();
}
This server listens for connections, and when a client connects, it parses its message and sends it back as a CSP channel via socket.
Datastream TCP Client
This image illustrates the communication and transformations between TCP client and a server that use Datastream to process data.
public final class TcpClientExample {
public static final int PORT = 9922;
public static void main(String[] args) {
Eventloop eventloop = Eventloop.builder()
.withFatalErrorHandler(rethrow())
.build();
eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
if (e == null) {
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(eventloop, socketChannel, null);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
StreamSuppliers.ofValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformWith(ChannelSerializer.create(INT_SERIALIZER))
.streamTo(ChannelConsumers.ofSocket(socket));
ChannelSuppliers.ofSocket(socket)
.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
.toList()
.whenResult(list -> list.forEach(System.out::println));
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
}
Datastream TCP Server
This server represents Server
from the above illustration:
public final class TcpServerExample {
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create();
InetSocketAddress address = new InetSocketAddress("localhost", TcpClientExample.PORT);
ServerSocketSettings socketSettings = ServerSocketSettings.defaultInstance();
eventloop.listen(address, socketSettings,
channel -> {
ITcpSocket socket;
try {
socket = TcpSocket.wrapChannel(eventloop, channel, null);
System.out.println("Client connected: " + channel.getRemoteAddress());
} catch (IOException e) {
throw new RuntimeException(e);
}
ChannelSuppliers.ofSocket(socket)
.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
.transformWith(StreamTransformers.mapper(x -> x + " times 10 = " + x * 10))
.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
.streamTo(ChannelConsumers.ofSocket(socket));
});
System.out.println("Connect to the server by running datastream.TcpClientExample");
eventloop.run();
}
}