Net
Handy and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny abstraction layer on top of Eventloop and Java NIO Adapters for AsyncTcpSocket along with AsyncUdpSocket.
Features
- Support of Promises for read and write operations
- Compatibility with CSP ChannelSupplier and ChannelConsumer. AsyncTcpSocket can work as a CSP channel with built-in back pressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (like buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
- Extensively optimized and has almost no performance overhead, uses ByteBufPool widely
The AbstractServer class serves as a foundation for building Eventloop-aware TCP servers (HTTP servers, RPC servers, TCP file services, etc.):
- Support of start/stop semantics
- Implements EventloopServer interface with listen/close capabilities
- Implements WorkerServer interface, so all subclasses of AbstractServer can be readily used as worker servers
- Support of ServerSocketSettings and SocketSettings
Ready-to-use PrimaryServer implementation which works in primary Eventloops as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers that will then execute the actual “accept” requests in their corresponding worker Eventloop threads.
Examples
- Ping-Pong Socket Connection
- CSP TCP Client Example
- CSP TCP Server Example
- Datastream TCP Client Example
- Datastream TCP Server Example
$ git clone https://github.com/activej/activej
Then import it as a Maven project. Check out tag v4.0-beta1. Before running the examples, build the project.
These examples are located at activej -> examples-> core -> net.
Ping-Pong Socket Connection
In this example we are using an implementation of AbstractServer - SimpleServer which receives a message and
sends a response (PONG
). We will also use AsyncTcpSocketNio to send 3 request messages (PING
).
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create().withCurrentThread();
SimpleServer server = SimpleServer.create(
socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
repeat(() ->
bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
.map($ -> true))
.whenComplete(socket::close);
})
.withListenAddress(ADDRESS)
.withAcceptOnce();
server.listen();
AsyncTcpSocketNio.connect(ADDRESS)
.whenResult(socket -> {
BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
loop(0,
i -> i < ITERATIONS,
i -> socket.write(wrapAscii(REQUEST_MSG))
.then(() -> bufsSupplier.decode(DECODER)
.whenResult(x -> System.out.println(x))
.map($2 -> i + 1)))
.whenComplete(socket::close);
})
.whenException(e -> { throw new RuntimeException(e); });
eventloop.run();
}
CSP TCP Client
A simple TCP console client which connects to TCP server:
private void run() {
System.out.println("Connecting to server at localhost (port 9922)...");
eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
if (e == null) {
System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);
} catch (IOException ioException) {
throw new RuntimeException(e);
}
BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
.decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));
startCommandLineInterface(socket);
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
public static void main(String[] args) {
new TcpClientExample().run();
}
It sends characters, receives some data back through CSP channel, parses it and then prints out to console.
CSP TCP Server
Simple TCP echo server which runs in an eventloop:
public static void main(String[] args) throws Exception {
Eventloop eventloop = Eventloop.create().withCurrentThread();
SimpleServer server = SimpleServer.create(socket ->
BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
.decodeStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
.map(buf -> {
ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
return ByteBufPool.append(serverBuf, buf);
})
.map(buf -> ByteBufPool.append(buf, CRLF))
.streamTo(ChannelConsumer.ofSocket(socket)))
.withListenPort(PORT);
server.listen();
System.out.println("Server is running");
System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");
eventloop.run();
}
This server listens for connections and when client connects, it parses its message and sends it back as CSP channel via socket.
Datastream TCP Client
This image illustrates communication and transformations between two Datastream servers. Datastream TCP client represents Server#1:
public final class TcpClientExample {
public static final int PORT = 9922;
public static void main(String[] args) {
Eventloop eventloop = Eventloop.create().withFatalErrorHandler(rethrowOnAnyError());
eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
if (e == null) {
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);
} catch (IOException ioEx) {
throw new RuntimeException(ioEx);
}
StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transformWith(ChannelSerializer.create(INT_SERIALIZER))
.streamTo(ChannelConsumer.ofSocket(socket));
StreamConsumerToList<String> consumer = StreamConsumerToList.create();
ChannelSupplier.ofSocket(socket)
.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
.streamTo(consumer);
consumer.getResult()
.whenResult(list -> list.forEach(System.out::println));
} else {
System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
}
});
eventloop.run();
}
}
Datastream TCP Server
This server represents Server#2 from the illustration above:
public final class TcpServerExample {
public static void main(String[] args) throws IOException {
Eventloop eventloop = Eventloop.create();
eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {
AsyncTcpSocket socket;
try {
socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);
System.out.println("Client connected: " + channel.getRemoteAddress());
} catch (IOException e) {
throw new RuntimeException(e);
}
ChannelSupplier.ofSocket(socket)
.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
.transformWith(StreamMapper.create(x -> x + " times 10 = " + x * 10))
.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
.streamTo(ChannelConsumer.ofSocket(socket));
});
System.out.println("Connect to the server by running datastream.TcpClientExample");
eventloop.run();
}
}