Streaming ETL With Apache Flink - Part 1

squirrel-looking-over-branch

Flink: as fast as squirrels

Introduction

After working in multiple projects involving Batch ETL through polling data sources, I started working on Streaming ETL. Streaming computation is necessary for use cases where real or near real-time analysis is required. For example, in IT Operations Analytics, it is paramount that Ops get critical alert information in real-time or within acceptable latency (near real-time) to help them mitigate downtime or any errors caused due to misconfiguration.

While there are many introductory articles on Flink (my personal favorite are blogs from Ivan Mushketyk), not many have been into details of streaming ETL and advanced aspects of the Flink framework, which are useful in a production environment.

In this blog, I will start with a simple example to help us understand Flink's basic streaming components. In upcoming posts, I will be sharing a few real-world use cases and some advanced Flink topics.

You may also like: Stream Processing With Apache Flink.

Why Flink?

I have been through the “DIY” or Do It Yourself mantra and personally would not recommend that for this use case. A proven and growing streaming computation framework is much better than writing and maintaining one yourself.

As a developer, I want some sort of freedom to provide complicated analysis logic to the framework that can be bundled easily instead of dealing with the steep learning curve and struggling with limitations of the framework itself.

Flink is lightweight, as compared to Spark, easy on developers to get started, and provides me the right APIs. Having said that, I will share my experience in using couple of other frameworks in the latter part of this series.

Let’s Get Started

Introduction to Flink v1.9 using simple integer aggregation. The example shown in this post is available on GitHub. Thus, I will only walk through important code and not go in-depth on Maven setup. 

Note: You don't need to download Flink to run this code. Maven will do the trick.

1. Create a streaming execution environment.

StreamExecutionEnvironment senv =  StreamExecutionEnvironment.getExecutionEnvironment();


2. Add a source that will continuously generate integer data. I am using a custom source function instead of predefined ones. You can check the predefined ones here. The IntegerGeneratorSource class generates 10 integers every second,

IntegerGeneratorSource source = new IntegerGeneratorSource();
DataStream<Integer> dataStream = senv.addSource(source);


3. The IntegerGeneratorSource class implements  SourceFunction, where T is the type of object produced by this source, in this example an Integer. The following code snippet shows the two methods that are important to us.

@Override
public void run( SourceContext<Integer> ctx ) throws Exception
{
   int counter = 1;
   while( isRunning )
   {
       // by default emit 10 integers per second
       Thread.sleep( sleepTimer );
       ctx.collect( counter++ );
   }
}

@Override
public void cancel()
{
   isRunning = false;
}


Tip – Do not use  ctx.wait(). Instead, use your timer, or let the source send the record when it is available.

4. So, now we have our data source and data stream object created. Next, provide business logic to Flink (what it should do with the data collected during a time window).

dataStream
 .timeWindowAll(Time.seconds(5))
 .apply(new AllWindowFunction < Integer, Integer, TimeWindow > () {
  private static final long serialVersionUID = -6868504589463321679L;

  // Sum all the integers within this timeWindow            
  @Override
  public void apply(TimeWindow window, Iterable < Integer > values, Collector < Integer > out) throws Exception {
   Integer result = 0;
   for (Integer v: values) {
    result += v;
   }
   out.collect(result);
  }
 })
 .addSink(new SinkFunction < Integer > () {
  private static final long serialVersionUID = 4123288113227508752L;

  // computation done for this timeWindow, simply print the aggregated value
  @Override
  public void invoke(Integer value) {
   System.out.println(value);
  }
 });


I am using a five-second time-window-based computation for streaming data. It tells Flink to buffer incoming data for every five seconds interval and then passes the entire buffered data to  apply(). This is an example of a Tumbling Window where data will not overlap within two windows.

 Apply() is where I am doing some computation to the entire data for the last five seconds, which will be a max of 50 integers. Here, it is a simple sum of all the values collected. Finally, I am sending the output back to Flink using the out object of Collector class. This step is very similar to “reduction.”

Now that Flink has the computed data of the given window, I am using addSink() to print it. Source and Sink are two patterns in streaming that are similar to producer and consumer. Sink is mostly either OLAP if the raw data is ETL’ed as expected or Sink can send to another Source for further processing until the data is transformed to be useful.

Running this code will keep on printing integers… until the program is running.

Printing integers while the program is running

Printing integers while the program is running

Flink and GC

To enforce quicker GC, I gave –Xmx64m in eclipse -> Run configuration, and the moment I started the program, Flink gave me following exception (Showing first few lines of the trace)

Exception in thread "main" org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration value for (taskmanager.network.memory.fraction, taskmanager.network.memory.min, taskmanager.network.memory.max) : (0.1, 67108864, 1073741824) - Network buffer memory size too large: 67108864 >= 59768832 (maximum JVM memory size) at org.apache.flink.runtime.util.ConfigurationParserUtils.checkConfigParameter(ConfigurationParserUtils.java:133)

The message is self-explanatory, but I went ahead and searched Flink's docs, which brought me here. As per the documentation:

Task Manager documentation in Flink

Task Manager documentation in Flink


Fair enough, Flink would like at least 64 MB for network buffers. So, what is a network buffer anyway? Flink has an answer here.

“Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput”

No surprises here, and the best part was the following:

Image title

Since I am not using any network connection (external or internal) in this sample code, I am happy to provide at least 64 MB memory to Flink, but I had to provide 73MB minimum to make my sample code run when I launched it from Eclipse. After running for a while, I saw GC occurring quite nicely and timely so Flink was doing memory management accordingly.

Now, I removed the Thread.wait() from the IntegerGeneratorSource::run() method and let the integer be added without any delay. When I ran this code, I was expecting the GC to occur frequently, but immediately Flink threw and exception and exited.

-1690621529
-1320000479
471351392
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:627)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.microservices.streaming.FlinkSimpleTest.init(FlinkSimpleTest.java:58)
at org.microservices.streaming.FlinkSimpleTest.main(FlinkSimpleTest.java:18)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3181)
at java.util.ArrayList.grow(ArrayList.java:265)
at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:239)
at java.util.ArrayList.ensureCapacityInternal(ArrayList.java:231)
at java.util.ArrayList.add(ArrayList.java:462)
at org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:101)


It did run for three time windows before OOM. Checking Flink code, the exception was thrown from  list.add(value) of following code snippet in the HeapListState.java file.

final StateTable<K, N, List<V>> map = stateTable;
List<V> list = map.get(namespace);

if (list == null) {
list = new ArrayList<>();
map.put(namespace, list);
}
list.add(value);


As of now, I am not able to find why this list is not cleared after every window. When using a tumbling window, Flink sends only the current set of records. I guess that within that time window, the count of Integers received through collect() grew so large that ArrayList was not able to get enough capacity from JVM.

Suggestion to Flink team

Suggestion to Developer

Anyway, I added a delay of one millisecond before ctx.collect() and ran the code again. Flink was doing great in terms of memory management.

Memory management during runtime

Memory management during runtime

Now that we have a basic code example ready, in the next post, I will optimize this example using a few of Flink's API.


Further Reading

 

 

 

 

Top