Apache Flink is a big data framework that allows programmers to process huge amounts of data in a very efficient and scalable way. This article will introduce some basic API concepts and standard data transformations available in the Apache Flink Java API. The fluid style of this API makes it easy to work with Flink’s central construct – a distributed collection. First, we’ll look at Flink’s DataSet API transformations and use them to implement a word-counting program. Then we’ll take a brief look at Flink’s DataStream API, which allows you to process event streams in real time.
Dependency on Maven
To get started, we’ll need to add the Maven dependencies to the link-java and flink-test-utils libraries:
org.apache.flink
flink-java
1.2.0
org.apache.flink
flink-test-utils_2.10
1.2.0
test
Basic API Concepts
When working with Flink, we need to know a few things related to its API: Various data transformation functions are available, including filtering, mapping, joining, grouping, and aggregation sink operation in Flink initiates the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to standard output. Flink transformations are lazy and not executed until the sink operation is invoked.
The API has two modes of operation, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API. To process unlimited data streams in real-time, you must use the DataStream API.
DataSet API Transformation
The entry point to a Flink program is an instance of the ExecutionEnvironment class — this defines the context in which the program is executed.
Let’s create an ExecutionEnvironment to start processing:
Note that running the application on the local machine will do the processing on the local JVM. If you wanted to start processing on a cluster of machines, you would need to install Apache Flink and configure the ExecutionEnvironment accordingly.
Create a dataset
We need to supply data to our program to perform data transformations.
You can create a DataSet from multiple sources, such as Apache Kafka, CSV, a file, or virtually any other data source.
Filter and reduce
Let’s say you want to filter out numbers above a certain threshold and then sum them all. You can use filter() and reduce() transformations to achieve this:
int threshold = 30;
List collect = amounts
.filter(a -> a > threshold)
.reduce((integer, t1) -> integer + t1)
.collect();
claimThis(collect.get(0)).isEqualTo(90);
Note that the collect() method is the sink operation that initiates the actual data transformations.
Map
Let’s say you have a DataSet of Person objects:
private static class Person {
private int age;
private String name;
// standard constructors/getters/setters
}
Next, we create a DataSet from these objects:
DataSet personDataSource = env.from collection(
Arrays.asList(
new Person(23, "Tom"),
new Person(75, "Michael")));
Suppose you want to extract only the age field from each collection object. You can only get certain fields of the Person class using the map() transformation:
Let’s say you have the following Tuple2 collection:
Tuple2 secondPerson = new Tuple2(4, "Tom");
Tuple2 third person = new Tuple2(5, "Scott");
Tuple2 fourth person = new Tuple2(200, "Michael");
Tuple2 firstPerson = new Tuple2(1, "Jack");
DataSet<Tuple2> transaction = env.from elements(
fourth person, second person, third person, first-person);
If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transform:
The word count problem is a problem commonly used to demonstrate the capabilities of big data processing frameworks. The basic solution involves counting the occurrences of words in the text input.
As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting a Tuple2 of key-value pairs for each token. In each of these tuples, the key is a word found in the text, and the value is an integer (1).
This class implements the FlatMapFunction interface, which takes a string as input and creates a Tuple2:
public class LineSplitter implements FlatMapFunction<String, Tuple2> {
@Overwrite
public void flatMap(String value, Collector<Nice2> out) {
Stream.of(value.toLowerCase().split("\W+"))
.filter(t -> t.length() > 0)
.forEach(token -> out.collect(new Tuple2(token, 1)));
}
}
We call the collect() method on the Collector class to push the data forward in the processing process.
Our next and final step is to group the tuples by their first elements (words) and then perform a sum on the second element to produce the number of occurrences of the words:
public static DataSet<Tuple2> startWordCount(
ExecutionEnvironment env, List lines) throws Exception {
DataSet text = env.fromCollection(lines);
return text.flatMap(new LineSplitter())
.groupBy(0)
.aggregate(Aggregations.SUM, 1);
}
We use three types of Flink transformations: flatMap(), groupBy(), and aggregate().
Let’s write a test to confirm that the word count implementation works as expected:
List lines = Arrays.asList(
"This is the first sentence",
"This is the second one-word sentence");
DataSet<Tuple2> result = WordCount.startWordCount(env, lines);
List<Tuple2> collect = result.collect();
assert that(collect).containsExactlyInAnyOrder(
new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1),
new Tuple2("is", 2), new Tuple2("that", 2), new Tuple2("other", 1),
new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
DataStream API
Creating a data stream
Apache Flink also supports event stream processing through the DataStream API. If we want to start consuming events, we must first use the StreamExecutionEnvironment class:
Furthermore, we can create a stream of events using the runtime from various sources. It could be some message bus like Apache Kafka, but in this example, we simply create a feed from a few string elements:
DataStream dataStream = execution environment.form elements(
"This is the first sentence",
"This is the second one-word sentence");
We can apply transformations to each element of the DataStream as in a normal DataSet class:
To trigger the execution, we need to call a sink operation like print(), which just prints the result of the transformations to standard output, followed by the execute() method in the StreamExecutionEnvironment class:
uppercase.print();
env.execute();
It produces the following output:
1> THIS IS THE FIRST SENTENCE
2> THIS IS THE SECOND SENTENCE WITH ONE WORD
Events window
When processing a real-time stream of events, it may sometimes be necessary to group events and apply some calculation to a window of those events.
Suppose we have a stream of events, where each event is a pair consisting of an event number and a timestamp of when the event was sent to our system, and that we can tolerate events that are out of sequence, but only if they are not more than twenty seconds late.
In this example, first, create a stream simulating two events that are several minutes apart, and define a timestamp extractor that determines our delay threshold:
SingleOutputStreamOperator<Tuple2> in window
= env.from elements(
new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor
<Tuple2>(Time.seconds(20)) {
@Overwrite
public long extract timestamp(Tuple2 element) {
return element.f1 * 1000;
}
});
Next, we define a window operation to group our events into five-second windows and apply a transformation to those events:
SingleOutputStreamOperator<Tuple2> reduced = in window
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.maxBy(0, true);
reduced.print();
It gets the last element of each five-second window, so it prints:
1> (15.1491221519)
We don’t see the second event because it arrived later than the specified delay threshold.
Conclusion
In this article, we introduced the Apache Flink framework and looked at some of the transformations that come with its API.
We implemented a word count program using Flink’s smooth and functional DataSet API. We then looked at the DataStream API and implemented a simple real-time transformation to an event stream.
The implementation of all these examples and code snippets can be found on GitHub – this is a Maven project, so it should be easy to import and run as is.
Flink transformations are lazy and not executed until the sink operation is invoked. The API has two modes of operations, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.
I am a Machine Learning Enthusiast. Done some Industry level projects on Data Science and Machine Learning. Have Certifications in Python and ML from trusted sources like data camp and Skills vertex. My Goal in life is to perceive a career in Data Industry.
We use cookies essential for this site to function well. Please click to help us improve its usefulness with additional cookies. Learn about our use of cookies in our Privacy Policy & Cookies Policy.
Show details
Powered By
Cookies
This site uses cookies to ensure that you get the best experience possible. To learn more about how we use cookies, please refer to our Privacy Policy & Cookies Policy.
brahmaid
It is needed for personalizing the website.
csrftoken
This cookie is used to prevent Cross-site request forgery (often abbreviated as CSRF) attacks of the website
Identityid
Preserves the login/logout state of users across the whole site.
sessionid
Preserves users' states across page requests.
g_state
Google One-Tap login adds this g_state cookie to set the user status on how they interact with the One-Tap modal.
MUID
Used by Microsoft Clarity, to store and track visits across websites.
_clck
Used by Microsoft Clarity, Persists the Clarity User ID and preferences, unique to that site, on the browser. This ensures that behavior in subsequent visits to the same site will be attributed to the same user ID.
_clsk
Used by Microsoft Clarity, Connects multiple page views by a user into a single Clarity session recording.
SRM_I
Collects user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
SM
Use to measure the use of the website for internal analytics
CLID
The cookie is set by embedded Microsoft Clarity scripts. The purpose of this cookie is for heatmap and session recording.
SRM_B
Collected user data is specifically adapted to the user or device. The user can also be followed outside of the loaded website, creating a picture of the visitor's behavior.
_gid
This cookie is installed by Google Analytics. The cookie is used to store information of how visitors use a website and helps in creating an analytics report of how the website is doing. The data collected includes the number of visitors, the source where they have come from, and the pages visited in an anonymous form.
_ga_#
Used by Google Analytics, to store and count pageviews.
_gat_#
Used by Google Analytics to collect data on the number of times a user has visited the website as well as dates for the first and most recent visit.
collect
Used to send data to Google Analytics about the visitor's device and behavior. Tracks the visitor across devices and marketing channels.
AEC
cookies ensure that requests within a browsing session are made by the user, and not by other sites.
G_ENABLED_IDPS
use the cookie when customers want to make a referral from their gmail contacts; it helps auth the gmail account.
test_cookie
This cookie is set by DoubleClick (which is owned by Google) to determine if the website visitor's browser supports cookies.
_we_us
this is used to send push notification using webengage.
WebKlipperAuth
used by webenage to track auth of webenagage.
ln_or
Linkedin sets this cookie to registers statistical data on users' behavior on the website for internal analytics.
JSESSIONID
Use to maintain an anonymous user session by the server.
li_rm
Used as part of the LinkedIn Remember Me feature and is set when a user clicks Remember Me on the device to make it easier for him or her to sign in to that device.
AnalyticsSyncHistory
Used to store information about the time a sync with the lms_analytics cookie took place for users in the Designated Countries.
lms_analytics
Used to store information about the time a sync with the AnalyticsSyncHistory cookie took place for users in the Designated Countries.
liap
Cookie used for Sign-in with Linkedin and/or to allow for the Linkedin follow feature.
visit
allow for the Linkedin follow feature.
li_at
often used to identify you, including your name, interests, and previous activity.
s_plt
Tracks the time that the previous page took to load
lang
Used to remember a user's language setting to ensure LinkedIn.com displays in the language selected by the user in their settings
s_tp
Tracks percent of page viewed
AMCV_14215E3D5995C57C0A495C55%40AdobeOrg
Indicates the start of a session for Adobe Experience Cloud
s_pltp
Provides page name value (URL) for use by Adobe Analytics
s_tslv
Used to retain and fetch time since last visit in Adobe Analytics
li_theme
Remembers a user's display preference/theme setting
li_theme_set
Remembers which users have updated their display / theme preferences
We do not use cookies of this type.
_gcl_au
Used by Google Adsense, to store and track conversions.
SID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
SAPISID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
__Secure-#
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
APISID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
SSID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
HSID
Save certain preferences, for example the number of search results per page or activation of the SafeSearch Filter. Adjusts the ads that appear in Google Search.
DV
These cookies are used for the purpose of targeted advertising.
NID
These cookies are used for the purpose of targeted advertising.
1P_JAR
These cookies are used to gather website statistics, and track conversion rates.
OTZ
Aggregate analysis of website visitors
_fbp
This cookie is set by Facebook to deliver advertisements when they are on Facebook or a digital platform powered by Facebook advertising after visiting this website.
fr
Contains a unique browser and user ID, used for targeted advertising.
bscookie
Used by LinkedIn to track the use of embedded services.
lidc
Used by LinkedIn for tracking the use of embedded services.
bcookie
Used by LinkedIn to track the use of embedded services.
aam_uuid
Use these cookies to assign a unique ID when users visit a website.
UserMatchHistory
These cookies are set by LinkedIn for advertising purposes, including: tracking visitors so that more relevant ads can be presented, allowing users to use the 'Apply with LinkedIn' or the 'Sign-in with LinkedIn' functions, collecting information about how visitors use the site, etc.
li_sugr
Used to make a probabilistic match of a user's identity outside the Designated Countries
MR
Used to collect information for analytics purposes.
ANONCHK
Used to store session ID for a users session to ensure that clicks from adverts on the Bing search engine are verified for reporting purposes and for personalisation
We do not use cookies of this type.
Cookie declaration last updated on 24/03/2023 by Analytics Vidhya.
Cookies are small text files that can be used by websites to make a user's experience more efficient. The law states that we can store cookies on your device if they are strictly necessary for the operation of this site. For all other types of cookies, we need your permission. This site uses different types of cookies. Some cookies are placed by third-party services that appear on our pages. Learn more about who we are, how you can contact us, and how we process personal data in our Privacy Policy.