Getting Started With Batch Processing Using Apache Flink
If you've been following software development news recently, you probably heard about a new project called Apache Flink. I've already written about it a bit here and here, but if you are not familiar with it, Apache Flink is a new-generation big data processing tool that can process either finite sets of data (this is also called batch processing) or potentially infinite streams of data (stream processing). In terms of new features, many believe Apache Flink is a game changer and can even replace Apache Spark in the future.
In this article, I'll introduce you to how you can use Apache Flink to implement simple batch processing algorithms. We will start with setting up our development environment, and then we will see how we can load data, process a dataset, and write data back to an external system.
Why Batch Processing?
You might have heard that stream processing is "the new hot thing right now" and that Apache Flink is a tool for stream processing. This can pose a question: Why do we need to learn how to implement batch processing applications?
While it is true that stream processing has become more and more widespread, many tasks still require batch processing. Also, if you are just getting started with Apache Flink, in my opinion, it is better to start with batch processing since it is simpler and in a way resembles working with a database. Once you've covered batch processing, you can learn about stream processing where Apache Flink really shines!
How to Follow Examples
If you want to implement some Apache Flink applications yourself, first you need to create a Flink project. In this article, we are going to write applications in Java, but you can also write Flink application in Scala, Python, or R.
To create a Flink Java project, execute the following command:
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ -DarchetypeVersion=1.3.2
After you enter group id, artifact id, and a project version, this command will create the following project structure:
. ├── pom.xml └── src └── main ├── java │ └── flinkProject │ ├── BatchJob.java │ ├── SocketTextStreamWordCount.java │ ├── StreamingJob.java │ └── WordCount.java └── resources └── log4j.properties
The most important here is the massive pom.xml
that specifies all the necessary dependencies. Automatically created Java classes are examples of some simple Flink applications that you can take a look at, but we don't need them for our purposes.
To start developing your first Flink application, create a class with the main
method like this:
public class FilterMovies { public static void main(String[] args) throws Exception { // Create Flink execution environment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // We will write our code here // Start Flink application env.execute(); } }
There is nothing special about this main
method. All we have to do is to add some boilerplate code.
First, we need to create a Flink execution environment that will behave differently if you run it on a local machine or in a Flink cluster:
- On a local machine, it will create a full-fledged Flink cluster with multiple local nodes. This is a good way to test how your application will work in a realistic environment
- On a Flink cluster, it won't create anything but will use existing cluster resources instead
Alternatively, you could create a collection environment like this:
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
This will create a Flink execution environment that, instead of running Flink application on a local cluster, will emulate all operations using in-memory collections in a single Java process. Your application will run faster, but this environment some subtle differences from a local cluster with multiple nodes.
Where Do We Start?
Before we can do anything, we need to read data into Apache Flink. We can read data from numerous systems, including local filesystem, S3, HDFS, HBase, Cassandra, etc. No matter where we read a dataset from, Apache Flink allows us to work with data in a uniform way using the DataSet
class:
DataSet<Integer> numbers = ...
All items in a dataset should have the same type. The single generics parameter specifies a type of the data that is stored in a dataset.
To read data from a file, we can use the readTextFile
method that will read lines in a file line by line and return a dataset of type String
:
DataSet<String> lines = env.readTextFile("path/to/file.txt");
If you specify a file path like this, Flink will attempt to read a local file. If you want to read a file from HDFS, you need to specify the hdfs://
protocol:
env.readCsvFile("hdfs:///path/to/file.txt")
Flink also has support for CSV files, but in this case, it won't return a dataset of strings. It will try to parse every line and return a dataset of Tuple
instances:
DataSet<Tuple2<Long, String>> lines = env.readCsvFile("data.csv") .types(Long.class, String.class);
Tuple2
is a class that stores an immutable pair of two fields, but there are other classes like Tuple0
, Tuple1
, Tuple3
, up to Tuple25
that store from zero to twenty-five fields. Later, we will see how to work with these classes.
The types
method specifies types and number of columns in a CSV file, so Flink could read a parse them.
We can also create small datasets that are very good for small experiments and unit tests:
// Create from a list DataSet<String> letters = env.fromCollection(Arrays.asList("a", "b", "c")); // Create from an array DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);
A question that you may ask is what data we can store in a DataSet? Not every Java type can be used in a dataset, and there are four different categories of types that you can use:
- Built-in Java types and POJO classes
- Flink tuples and Scala case classes
- Values, which are special mutable wrappers for Java primitive types that you can use to increase performance (I'll write about this in one of the upcoming articles)
- Implementations of Hadoop Writable interface
Processing Data With Apache Flink
Now to the data processing part! How do you implement an algorithm for processing your data? To do this, you can use a number of operations that resemble Java 8 streams operations, such as:
map
: Converts items in a dataset using a user-defined function. Every input element is converted into exactly one output element.filter
: Filters items in a dataset according to a user-defined function.flatMap
: Similar to themap
operator, but allows returning zero, one, or many elements.groupBy
: Groups elements by a key. Similar to theGROUP BY
operator in SQL.project
: Select specified fields in a dataset of tuples, similar to theSELECT
operator from SQL.reduce
: Combines elements in a dataset into a single value using a user-defined function.
Keep in mind that the biggest difference between Java streams and these operations is that Java 8 works with data in memory and can access local data, while Flink works with data on a cluster in a distributed environment.
Let's take a look at a simple example that uses these operations. The following example is very straightforward. It creates a dataset of numbers, which squares every number and filters out all odd numbers.
// Create a dataset of numbers DataSet<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7); // Square every number DataSet<Integer> result = numbers.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer integer) throws Exception { return integer * integer; } }) // Leave only even numbers .filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer integer) throws Exception { return integer % 2 == 0; } });
If you have any experience with Java 8, you are probably wondering why I don't use lambdas here. We can use lambdas here but it can cause some complications, as I've written here.
Saving Data Back
After we've finished processing our data it would make sense to save the result of our hard work. Flink can store data into a number of third-party systems such as HDFS, S3, Cassandra, etc.
For example, to write data to a file, we need to use the writeAsText
method from the DataSet
class:
DataSet<Integer> ds = ... ds.writeAsText("path/to/file");
For debugging/testing purposes Flink can write data to standard output or to standard output:
DataSet<Integer> ds = ... // Output dataset to the standard output ds.print(); // Output dataset to the standard err ds.printToErr()
More Complicated Example
To implement some meaningful algorithms we need to first download a Grouplens movies dataset. It contains several CSV files with information about movies and movie ratings. We are going to work with the movies.csv
file from this dataset which contains a list of all movies and looks like this:
movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy 3,Grumpier Old Men (1995),Comedy|Romance 4,Waiting to Exhale (1995),Comedy|Drama|Romance 5,Father of the Bride Part II (1995),Comedy 6,Heat (1995),Action|Crime|Thriller 7,Sabrina (1995),Comedy|Romance 8,Tom and Huck (1995),Adventure|Children 9,Sudden Death (1995),Action 10,GoldenEye (1995),Action|Adventure|Thriller
It has three columns:
movieId
: A unique movie id for a movie in this dataset.title
: A title of the movie.genres
: A|
separated list of genres for each movie.
We can now load this CSV file in Apache Flink and perform some meaningful processing. Here, we will load a file from a local filesystem, while in a realistic environment you would read a much bigger dataset and it would probably reside in a distributed system, such as S3 or HDFS.
In this demo, let's find all movies of the "Action" genre. Here is a code snippet that that does this:
// Load dataset of movies DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") .ignoreFirstLine() .parseQuotedStrings('"') .ignoreInvalidLines() .types(Long.class, String.class, String.class); DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } }); DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } }); filteredMovies.writeAsText("output.txt");
Let's break it down. First, we read a CSV file using the readCsvFile
method:
DataSet<Tuple3<Long, String, String>> lines = env.readCsvFile("movies.csv") // ignore CSV header .ignoreFirstLine() // Set strings quotes character .parseQuotedStrings('"') // Ignore invalid lines in the CSV file .ignoreInvalidLines() // Specify types of columns in the CSV file .types(Long.class, String.class, String.class);
Using helper methods, we specify how to parse strings in the CSV file and that we need to skip the first line. In the last line, we specify a type of each column in the CSV file and Flink will parse data for us.
Now when we have a dataset loaded in a Flink cluster, we can do some data processing. First, we parse a list of genres for every movie using the map
method:
DataSet<Movie> movies = lines.map(new MapFunction<Tuple3<Long,String,String>, Movie>() { @Override public Movie map(Tuple3<Long, String, String> csvLine) throws Exception { String movieName = csvLine.f1; String[] genres = csvLine.f2.split("\\|"); return new Movie(movieName, new HashSet<>(Arrays.asList(genres))); } });
To transform every movie we need to implement the MapFunction
that will receive every CSV record as a Tuple3
instance and will convert it into the Movie
POJO class:
class Movie { private String name; private Set<String> genres; public Movie(String name, Set<String> genres) { this.name = name; this.genres = genres; } public String getName() { return name; } public Set<String> getGenres() { return genres; } }
If you recall the structure of the CSV file, the second column contains a name of a movie and the third column contains a list of genres. Hence, we access these columns using fields f1
and f2
respectively.
Now, when we have a dataset of movies, we can implement the core part of our algorithm and filter all action movies:
DataSet<Movie> filteredMovies = movies.filter(new FilterFunction<Movie>() { @Override public boolean filter(Movie movie) throws Exception { return movie.getGenres().contains("Action"); } });
This will only return movies that contain "Action" in the set of genres.
Now the last step is very straightforward — we store result data into a file:
filteredMovies.writeAsText("output.txt");
This simply stores the result data into a local text file, but as with the readTextFile
method, we could write this file into HDFS or S3 by specifying a protocol like hdfs://
.
More Information
This was an introductory article, and there is much more to Apache Flink. I will write more articles about Flink in the near future, so stay tuned! You can read my other articles here, or you can you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink. Here is a short preview of this course.