Dataflow

Dataflow is a distributed stream-based batch processing engine for big data applications. You can create tasks on your client that will be executed on Dataflow servers with datasets. The task compiles into an execution graph that will be executed on partitions.

Example

In this example we will posts a simple Map-Reduce task to a cluster of Dataflow nodes.

Note: To run the examples, you need to clone ActiveJ from GitHub:
$ git clone https://github.com/activej/activej
And import it as a Maven project. Check out branch master. Before running the examples, build the project.
These examples are located at activej -> examples -> core -> dataflow.

1. Create a Dataflow Server Launcher

First, we need to launch two Dataflow servers. Each of them will have its own “items” dataset that contains 10K random words that can overlap. To create a Dataflow server launcher we’ll use pre-defined DataflowServerLauncher class that extends Launcher class.

public final class DataflowServerLauncherExample extends DataflowServerLauncher {

	@Override
	protected Module getOverrideModule() {
		return ModuleBuilder.create()
				.bind(codec(CreateStringCountFunction.class)).toInstance(ofObject(CreateStringCountFunction::new))
				.bind(codec(ExtractStringFunction.class)).toInstance(ofObject(ExtractStringFunction::new))
				.bind(codec(StringCountReducer.class)).toInstance(ofObject(StringCountReducer::new))

				.bind(StreamSorterStorageFactory.class).toInstance(StreamMergeSorterStorageStub.FACTORY_STUB)

				.bind(Config.class).toInstance(
						Config.create()
								.with("dataflow.server.listenAddresses", args.length > 0 ? args[0] : "9000")
								.with("dataflow.secondaryBufferPath", Util.createTempDir("dataflow-server-secondary-storage")))
				.build();
	}

	@Provides
	@DatasetId("items")
	List<String> words() {
		String file = args.length > 1 ? args[1] : "words1.txt";
		return new BufferedReader(new InputStreamReader(getClass().getResourceAsStream(file)))
				.lines()
				.filter(s -> !s.isEmpty())
				.collect(toList());
	}

	public static void main(String[] args) throws Exception {
		new DataflowServerLauncherExample().launch(args);
	}
}

First, override getOverrideModule method to create an abstract module with all the needed bindings: CreateStringCountFunction, ExtractStringFunction, StringCountReducer, StreamSorterStorageFactory, and Config. Next, we create words method to efficiently retrieve a list of Strings from the .txt file.

Finally, we define main method for starting our launcher.

See Dataflow server launcher source code on GitHub

2. Create a Dataflow Client Launcher with a task

Let’s now create a Dataflow client launcher. We’ll make it by analogy with the server launcher. So we override getOverrideModule and provide the same required dependencies:

public final class DataflowClientLauncherExample extends DataflowClientLauncher {
	private static final String DEFAULT_PARTITION = "127.0.0.1:9000";

	@Inject
	DataflowClient client;

	@Inject
	DataflowGraph graph;

	@Inject
	Eventloop eventloop;

	@Override
	protected Module getOverrideModule() {
		return ModuleBuilder.create()
				.bind(codec(CreateStringCountFunction.class)).toInstance(ofObject(CreateStringCountFunction::new))
				.bind(codec(ExtractStringFunction.class)).toInstance(ofObject(ExtractStringFunction::new))
				.bind(codec(StringCountReducer.class)).toInstance(ofObject(StringCountReducer::new))

				.bind(StreamSorterStorageFactory.class).toInstance(StreamMergeSorterStorageStub.FACTORY_STUB)

				.bind(Config.class).toInstance(
						Config.create()
								.with("dataflow.secondaryBufferPath", Util.createTempDir("dataflow-client-secondary-storage"))
								.with("dataflow.partitions", args.length == 0 ? DEFAULT_PARTITION : String.join(",", args)))
				.build();
	}

}

Now let’s create a task for our Dataflow partitions. We’ll define it in the overridden Launcher’s main method run:

@Override
protected void run() throws InterruptedException {
	eventloop.execute(() -> {
		StringCountReducer reducer = new StringCountReducer();
		ExtractStringFunction keyFunction = new ExtractStringFunction();

		Dataset<String> items = datasetOfId("items", String.class);

		Dataset<StringCount> mappedItems = map(items, new CreateStringCountFunction(), StringCount.class);

		LocallySortedDataset<String, StringCount> locallySorted = localSort(mappedItems, String.class, keyFunction, naturalOrder());

		LocallySortedDataset<String, StringCount> locallyReduced = localReduce(locallySorted, reducer.inputToAccumulator(), StringCount.class, keyFunction);

		Dataset<StringCount> reducedItems = repartitionReduce(locallyReduced, reducer.accumulatorToOutput(), StringCount.class);

		MergeCollector<String, StringCount> collector = new MergeCollector<>(reducedItems, client, keyFunction, naturalOrder(), false);

		StreamSupplier<StringCount> resultSupplier = collector.compile(graph);

		StreamConsumerToList<StringCount> resultConsumer = StreamConsumerToList.create();

		System.out.println("\n *** Dataset graph:\n");
		System.out.println(reducedItems.toGraphViz());
		System.out.println("\n *** Compiled nodes graph:\n");
		System.out.println(graph.toGraphViz());

		graph.execute().both(resultSupplier.streamTo(resultConsumer))
				.whenException(Throwable::printStackTrace)
				.whenResult(() -> {
					System.out.println("Top 100 words:");
					resultConsumer.getList().stream().limit(100).forEach(System.out::println);
				})
				.whenComplete(this::shutdown);
	});

	awaitShutdown();
}

public static void main(String[] args) throws Exception {
	new DataflowClientLauncherExample().launch(args);
}

This code does the following:

  1. Maps strings by creating ('word', 1) pairs
  2. Sorts the pairs in alphabetic order by the String value
  3. Reduces the pairs by merging similar word pairs. For example, (apple, 1) and (apple, 1) will be reduced into (apple, 2)
  4. Distributes the pairs according to a provided rule. For example, if partition 1 contains (apple, 2), (dog, 2) and partition 2 contains (apple, 3), (dog, 1) the result of repartitioning might be (apple, 2), (apple, 3) on partition 1 and (dog, 2), (dog, 1) on partition 2.
  5. Repeats step 3. As a result of these steps, we receive a sorted and reduced dataset with unique items across all the partitions
  6. With the help of collector, client pulls streams from all the nodes and merges them into a single stream
  7. The stream is collected into a list that we can work with. In the example we simply print out the first 100 words

Finally, we create main method that launches the client.

See Dataflow client launcher source code on GitHub

Testing

First, launch two dataflow servers. You need to specify nodes’ addresses and source files as program arguments. Server Launcher 1 arguments: 9000 words1.txt. Server Launcher 2 arguments: 9001 words2.txt. Next, launch dataflow client to post the created task to the servers. You’ll need to specify the ports of the partitions as program arguments: 9000 9001.

After everything is launched and the task is executed, in the console you will see the list of the first 100 words collected from the two dataflow servers. All the data flows between partitions are represented in sysout and can be transformed into the following graph: