Net

Handy and efficient alternative to Netty, Apache Mina, and other similar solutions for high performance networking. A tiny abstraction layer on top of Eventloop and Java NIO Adapters for AsyncTcpSocket along with AsyncUdpSocket.

Features

  • Support of Promises for read and write operations
  • Compatibility with CSP ChannelSupplier and ChannelConsumer. AsyncTcpSocket can work as a CSP channel with built-in back pressure propagation, and can be plugged into CSP/Datastream pipeline with all its features (like buffering, compression, serialization/deserialization, data transformations, data filtering, reducing, etc.)
  • Extensively optimized and has almost no performance overhead, uses ByteBufPool widely

The AbstractServer class serves as a foundation for building Eventloop-aware TCP servers (HTTP servers, RPC servers, TCP file services, etc.):

Ready-to-use PrimaryServer implementation which works in primary Eventloops as a balancer. It takes external “accept” requests and redistributes them to the WorkerServers that will then execute the actual “accept” requests in their corresponding worker Eventloop threads.

Examples

Note: To run the examples, you need to clone ActiveJ from GitHub:
$ git clone https://github.com/activej/activej
Then import it as a Maven project. Check out branch master. Before running the examples, build the project.
These examples are located at activej -> examples-> core -> net.

Ping-Pong Socket Connection

In this example we are using an implementation of AbstractServer - SimpleServer which receives a message and sends a response (PONG). We will also use AsyncTcpSocketNio to send 3 request messages (PING).

public static void main(String[] args) throws IOException {
	Eventloop eventloop = Eventloop.create().withCurrentThread();

	SimpleServer server = SimpleServer.create(
			socket -> {
				BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
				repeat(() ->
						bufsSupplier.parse(DECODER)
								.whenResult(x -> System.out.println(x))
								.then(() -> socket.write(wrapAscii(RESPONSE_MSG)))
								.map($ -> true))
						.whenComplete(socket::close);
			})
			.withListenAddress(ADDRESS)
			.withAcceptOnce();

	server.listen();

	AsyncTcpSocketNio.connect(ADDRESS)
			.whenResult(socket -> {
				BinaryChannelSupplier bufsSupplier = BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket));
				loop(0,
						i -> i < ITERATIONS,
						i -> socket.write(wrapAscii(REQUEST_MSG))
								.then(() -> bufsSupplier.parse(DECODER)
										.whenResult(x -> System.out.println(x))
										.map($2 -> i + 1)))
						.whenComplete(socket::close);
			})
			.whenException(e -> { throw new RuntimeException(e); });

	eventloop.run();
}

See full example on GitHub

CSP TCP Client

A simple TCP console client which connects to TCP server:

private void run() {
	System.out.println("Connecting to server at localhost (port 9922)...");
	eventloop.connect(new InetSocketAddress("localhost", 9922), (socketChannel, e) -> {
		if (e == null) {
			System.out.println("Connected to server, enter some text and send it by pressing 'Enter'.");
			AsyncTcpSocket socket = AsyncTcpSocketNio.wrapChannel(getCurrentEventloop(), socketChannel, null);

			BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
					.parseStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
					.streamTo(ChannelConsumer.ofConsumer(buf -> System.out.println(buf.asString(UTF_8))));

			startCommandLineInterface(socket);
		} else {
			System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
		}
	});
	eventloop.run();
}

public static void main(String[] args) {
	new TcpClientExample().run();
}

It sends characters, receives some data back through CSP channel, parses it and then prints out to console.

See full example on GitHub

CSP TCP Server

Simple TCP echo server which runs in an eventloop:

public static void main(String[] args) throws Exception {
	Eventloop eventloop = Eventloop.create().withCurrentThread();

	SimpleServer server = SimpleServer.create(socket ->
			BinaryChannelSupplier.of(ChannelSupplier.ofSocket(socket))
					.parseStream(ByteBufsDecoder.ofCrlfTerminatedBytes())
					.peek(buf -> System.out.println("client:" + buf.getString(UTF_8)))
					.map(buf -> {
						ByteBuf serverBuf = ByteBufStrings.wrapUtf8("Server> ");
						return ByteBufPool.append(serverBuf, buf);
					})
					.map(buf -> ByteBufPool.append(buf, CRLF))
					.streamTo(ChannelConsumer.ofSocket(socket)))
			.withListenPort(PORT);

	server.listen();

	System.out.println("Server is running");
	System.out.println("You can connect from telnet with command: telnet localhost 9922 or by running csp.TcpClientExample");

	eventloop.run();
}

This server listens for connections and when client connects, it parses its message and sends it back as CSP channel via socket.

See full example on GitHub

Datastream TCP Client

This image illustrates communication and transformations between two Datastream servers. Datastream TCP client represents Server#1:

public final class TcpClientExample {
	public static final int PORT = 9922;

	public static void main(String[] args) {
		Eventloop eventloop = Eventloop.create().withFatalErrorHandler(rethrowOnAnyError());

		eventloop.connect(new InetSocketAddress("localhost", PORT), (socketChannel, e) -> {
			if (e == null) {
				AsyncTcpSocket socket = AsyncTcpSocketNio.wrapChannel(eventloop, socketChannel, null);

				StreamSupplier.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
						.transformWith(ChannelSerializer.create(INT_SERIALIZER))
						.streamTo(ChannelConsumer.ofSocket(socket));

				StreamConsumerToList<String> consumer = StreamConsumerToList.create();

				ChannelSupplier.ofSocket(socket)
						.transformWith(ChannelDeserializer.create(UTF8_SERIALIZER))
						.streamTo(consumer);

				consumer.getResult()
						.whenResult(list -> list.forEach(System.out::println));

			} else {
				System.out.printf("Could not connect to server, make sure it is started: %s%n", e);
			}
		});

		eventloop.run();
	}
}

See full example on GitHub

Datastream TCP Server

This server represents Server#2 from the illustration above:

public final class TcpServerExample {

	public static void main(String[] args) throws IOException {
		Eventloop eventloop = Eventloop.create();

		eventloop.listen(new InetSocketAddress("localhost", TcpClientExample.PORT), ServerSocketSettings.create(100), channel -> {
			AsyncTcpSocket socket = AsyncTcpSocketNio.wrapChannel(eventloop, channel, null);

			try {
				System.out.println("Client connected: " + channel.getRemoteAddress());
			} catch (IOException e) {
				e.printStackTrace();
			}

			ChannelSupplier.ofSocket(socket)
					.transformWith(ChannelDeserializer.create(INT_SERIALIZER))
					.transformWith(StreamMapper.create(x -> x + " times 10 = " + x * 10))
					.transformWith(ChannelSerializer.create(UTF8_SERIALIZER))
					.streamTo(ChannelConsumer.ofSocket(socket));
		});

		System.out.println("Connect to the server by running datastream.TcpClientExample");

		eventloop.run();
	}
}

See full example on GitHub