Apache Flink is a big data framework that allows programmers to process huge amounts of data in a very efficient and scalable way. This article will introduce some basic API concepts and standard data transformations available in the Apache Flink Java API. The fluid style of this API makes it easy to work with Flink’s central construct – a distributed collection. First, we’ll look at Flink’s DataSet API transformations and use them to implement a word-counting program. Then we’ll take a brief look at Flink’s DataStream API, which allows you to process event streams in real time.
Dependency on Maven
To get started, we’ll need to add the Maven dependencies to the link-java and flink-test-utils libraries:
org.apache.flink
flink-java
1.2.0
org.apache.flink
flink-test-utils_2.10
1.2.0
test
Basic API Concepts
When working with Flink, we need to know a few things related to its API: Various data transformation functions are available, including filtering, mapping, joining, grouping, and aggregation sink operation in Flink initiates the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to standard output. Flink transformations are lazy and not executed until the sink operation is invoked.
The API has two modes of operation, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API. To process unlimited data streams in real-time, you must use the DataStream API.
DataSet API Transformation
The entry point to a Flink program is an instance of the ExecutionEnvironment class — this defines the context in which the program is executed.
Let’s create an ExecutionEnvironment to start processing:
Note that running the application on the local machine will do the processing on the local JVM. If you wanted to start processing on a cluster of machines, you would need to install Apache Flink and configure the ExecutionEnvironment accordingly.
Create a dataset
We need to supply data to our program to perform data transformations.
You can create a DataSet from multiple sources, such as Apache Kafka, CSV, a file, or virtually any other data source.
Filter and reduce
Let’s say you want to filter out numbers above a certain threshold and then sum them all. You can use filter() and reduce() transformations to achieve this:
int threshold = 30;
List collect = amounts
.filter(a -> a > threshold)
.reduce((integer, t1) -> integer + t1)
.collect();
claimThis(collect.get(0)).isEqualTo(90);
Note that the collect() method is the sink operation that initiates the actual data transformations.
Map
Let’s say you have a DataSet of Person objects:
private static class Person {
private int age;
private String name;
// standard constructors/getters/setters
}
Next, we create a DataSet from these objects:
DataSet personDataSource = env.from collection(
Arrays.asList(
new Person(23, "Tom"),
new Person(75, "Michael")));
Suppose you want to extract only the age field from each collection object. You can only get certain fields of the Person class using the map() transformation:
Let’s say you have the following Tuple2 collection:
Tuple2 secondPerson = new Tuple2(4, "Tom");
Tuple2 third person = new Tuple2(5, "Scott");
Tuple2 fourth person = new Tuple2(200, "Michael");
Tuple2 firstPerson = new Tuple2(1, "Jack");
DataSet<Tuple2> transaction = env.from elements(
fourth person, second person, third person, first-person);
If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transform:
The word count problem is a problem commonly used to demonstrate the capabilities of big data processing frameworks. The basic solution involves counting the occurrences of words in the text input.
As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting a Tuple2 of key-value pairs for each token. In each of these tuples, the key is a word found in the text, and the value is an integer (1).
This class implements the FlatMapFunction interface, which takes a string as input and creates a Tuple2:
public class LineSplitter implements FlatMapFunction<String, Tuple2> {
@Overwrite
public void flatMap(String value, Collector<Nice2> out) {
Stream.of(value.toLowerCase().split("\W+"))
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2(token, 1)));
}
}
We call the collect() method on the Collector class to push the data forward in the processing process.
Our next and final step is to group the tuples by their first elements (words) and then perform a sum on the second element to produce the number of occurrences of the words:
public static DataSet<Tuple2> startWordCount(
ExecutionEnvironment env, List lines) throws Exception {
DataSet text = env.fromCollection(lines);
return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}
We use three types of Flink transformations: flatMap(), groupBy(), and aggregate().
Let’s write a test to confirm that the word count implementation works as expected:
List lines = Arrays.asList(
"This is the first sentence",
"This is the second one-word sentence");
DataSet<Tuple2> result = WordCount.startWordCount(env, lines);
List<Tuple2> collect = result.collect();
assert that(collect).containsExactlyInAnyOrder(
new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1),
new Tuple2("is", 2), new Tuple2("that", 2), new Tuple2("other", 1),
new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
DataStream API
Creating a data stream
Apache Flink also supports event stream processing through the DataStream API. If we want to start consuming events, we must first use the StreamExecutionEnvironment class:
Furthermore, we can create a stream of events using the runtime from various sources. It could be some message bus like Apache Kafka, but in this example, we simply create a feed from a few string elements:
DataStream dataStream = execution environment.form elements(
"This is the first sentence",
"This is the second one-word sentence");
We can apply transformations to each element of the DataStream as in a normal DataSet class:
To trigger the execution, we need to call a sink operation like print(), which just prints the result of the transformations to standard output, followed by the execute() method in the StreamExecutionEnvironment class:
uppercase.print();
env.execute();
It produces the following output:
1> THIS IS THE FIRST SENTENCE
2> THIS IS THE SECOND SENTENCE WITH ONE WORD
Events window
When processing a real-time stream of events, it may sometimes be necessary to group events and apply some calculation to a window of those events.
Suppose we have a stream of events, where each event is a pair consisting of an event number and a timestamp of when the event was sent to our system, and that we can tolerate events that are out of sequence, but only if they are not more than twenty seconds late.
In this example, first, create a stream simulating two events that are several minutes apart, and define a timestamp extractor that determines our delay threshold:
SingleOutputStreamOperator<Tuple2> in window
= env.from elements(
new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor
<Tuple2>(Time.seconds(20)) {
@Overwrite
public long extract timestamp(Tuple2 element) {
return element.f1 * 1000;
}
});
Next, we define a window operation to group our events into five-second windows and apply a transformation to those events:
SingleOutputStreamOperator<Tuple2> reduced = in window
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();
It gets the last element of each five-second window, so it prints:
1> (15.1491221519)
We don’t see the second event because it arrived later than the specified delay threshold.
Conclusion
In this article, we introduced the Apache Flink framework and looked at some of the transformations that come with its API.
We implemented a word count program using Flink’s smooth and functional DataSet API. We then looked at the DataStream API and implemented a simple real-time transformation to an event stream.
The implementation of all these examples and code snippets can be found on GitHub – this is a Maven project, so it should be easy to import and run as is.
Flink transformations are lazy and not executed until the sink operation is invoked. The API has two modes of operations, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.
We use cookies on Analytics Vidhya websites to deliver our services, analyze web traffic, and improve your experience on the site. By using Analytics Vidhya, you agree to our Privacy Policy and Terms of Use.Accept
Privacy & Cookies Policy
Privacy Overview
This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary cookies are absolutely essential for the website to function properly. This category only includes cookies that ensures basic functionalities and security features of the website. These cookies do not store any personal information.
Any cookies that may not be particularly necessary for the website to function and is used specifically to collect user personal data via analytics, ads, other embedded contents are termed as non-necessary cookies. It is mandatory to procure user consent prior to running these cookies on your website.