Why you Should and How to Archive your Kafka Data to Amazon S3

Archive Kafka Data to AWS S3

Over the last decade, the volume and variety of data and data storage technologies have been soaring. Businesses in every industry have been looking for cost-effective ways to store it, and storage has been one of the main requirements for data retention. With a change to [near] real-time data pipelines and growing adoption of Apache Kafka, companies must find ways to reduce the Total Cost of Ownership (TCO) of their data platform.

Currently, Kafka is configured with a shorter, and typically three days, retention period. Data, older than the retention period, is copied, via streaming data pipelines, to scalable external storage for long-term use like AWS S3 or Hadoop Distributed File System (HDFS).

When it comes to moving data out of Apache Kafka, the most successful option is to let Apache Kafka Connect do the work. The Kafka Connect AWS S3 sink used is open source, and can be found on GitHub  - the repository hosts a collection of open-source Kafka Connect sinks sources.

Why Different Long-Term Storage?

Using Kafka as your source of long-term storage can lead to significant costs. When work on vanilla Kafka distribution for tiered storage is completed, it will improve cost. However, migrating to the latest version might not be an immediate option for all businesses.

Tiered storage might not always be the best solution; it all depends on the data access pattern. If you deploy your ML-driven processes in production daily, and the historical data is read a lot, this can impact the Kafka brokers' cache and introduce slowness into your cluster.

How to Use It

Here’s how to stream Kafka data to AWS S3. If a Connect cluster is not already running, and the Kafka version is not at least 2.3, follow the Kafka documentation here. 

This exercise uses AVRO payloads, and therefore, the presence of a Schema Registry is required. Alternatively, rely on JSON and store the data in AWS S3 as JSON.

Have the AWS access keys and S3 bucket available.

If you already have them, you can jump to the next step. To obtain the access keys follow AWS secret keys documentation, and for the S3 bucket setup follow this documentation. Ensure you use sensible selections for security and access options.

Download and install the S3 sink connector

This step is required on all Connect cluster workers.

Shell


This will unpack the code to /opt/kafka-connect-aws-s3 folder. There are two options. The first is to copy this folder to your Connect worker plugin.path; the second one is to change the worker configuration for plugin path to include the S3 sink (this requires the Connect workers to be restarted)

Properties files
 




xxxxxxxxxx
1


 
1
plugin.path=/usr/share/connectors,/opt/kafka-connect-aws-s3



Validate the connector is ready to be used

To make sure the connector is ready to be configured run this curl command (do update the endpoint to match your Connect cluster endpoint[-s]).

Shell
 




xxxxxxxxxx
1


 
1
curl -sS localhost:8083/connector-plugins | jq .[].class | grep S3SinkConnector



The output should read:io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector

Create a topic named orders

The command line utility is provided with any Kafka distribution, and you can find it in the installation folder. 

Shell
 




xxxxxxxxxx
1


 
1
bin/kafka-topics.sh --create --topic orders --bootstrap-server localhost:9092



Create the S3 sink configuration file

Using an editor of your choice, paste the following content to a file named s3.json, and apply the changes for your S3 bucket and AWS secret keys and the schema registry endpoint.

JSON
 




x
1
15


 
1
{
2
  "name": "lenses-s3-sink",
3
  "config": {
4
    "connector.class": "io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector",
5
    "tasks.max": "1",
6
    "topics": "orders",
7
    "connect.s3.kcql": "INSERT INTO ${BUCKET_NAME}:${PREFIX_NAME} SELECT * FROM orders STOREAS `AVRO` WITH_FLUSH_COUNT = 5000",
8
    "aws.region": "eu-west-1",
9
    "aws.access.key": "${YOUR_ACCESS_KEY}",
10
    "aws.secret.key": "${YOUR_SECRET_KEY}",
11
    "aws.auth.mode": "Credentials",
12
    "value.converter": "io.confluent.connect.avro.AvroConverter",
13
    "value.converter.schema.registry.url": "http://localhost:8081"
14
  }
15
}



Run the S3 sink

With the configuration file created, the following curl command will instruct Connect to create and run the S3 sink:

Shell
 




xxxxxxxxxx
1


1
curl -s -X POST -H 'Content-Type: application/json' --data @s3.json http://localhost:8083/connectors



Push data to orders topic

With the sink running, you can insert data in the orders topic using the AVRO producer command line: 

Shell
 




xxxxxxxxxx
1


1
bin/kafka-avro-console-producer \
2
 --broker-list localhost:9092 --topic orders \ 
3
 --property \
4
value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"created","type":"string"},{"name":"product","type":"string"},{"name":"price","type":"double"}, {"name":"qty", "type":"int"}]}'



The console is now waiting for your input. Paste this content and hit enter:

JSON
 




xxxxxxxxxx
1


1
{"id": 1, "created": "2016-05-06 13:53:00", "product": "OP-DAX-P-20150201-95.7", "price": 94.2, "qty":100}



Check the S3 bucket

You can use the AWS console for that or the command line if you have it installed.

Java
 




x


 
1
aws ls s3://mybucket/mykey --recursive



Exactly Once

Moving data from Kafka to AWS S3 should follow the principle of avoiding as few surprises as possible. As a data engineer setting the connector up and then querying the data in S3, you should not expect the same record to appear more than once. When the Connect framework re-balances the tasks, the underlying Kafka consumer group is re-balanced. Therefore, there is a chance of the same Kafka record being given to the connector more than once. Thus a mechanism to ensure a record is written exactly once is required.

When the Connect framework reads a source topic, each of the topic partitions is handled by only one of the sink running tasks. This benefit comes from the Kafka Consumer group semantics; each sink task has a Kafka consumer who joins a Kafka consumer group unique to the connector (connector name is the group identifier).

When a topic has four partitions, and the sink has been configured with three tasks, the following can be a workload distribution:

Being able to allocate an S3 writer per input topic-partition avoids a lot of complexity. The complexity would have resulted from a distributed write, where at least two tasks were writing records for the same partition. 

Records received by the sink are first stored into a temporary file. Only once the commit policy is met, the file is moved to the desired location, and therefore, the records are considered committed.


Writing the file to the desired location is done in such a way to keep the latest Kafka record offset written for a given partition.  Having the last written offset is essential in case of failures or stop-start actions. 

The Connect framework gives the sink task the option to overwrite the underlying Kafka consumer offsets about to be committed to Kafka. While the temporary file accumulates records, the sink returns to the framework the last offset written for the partition. Doing so avoid having records dropped or written more than once.

A simplified version of the Connect flow is illustrated below:

The same rules apply when using the custom data partitioner. In this case, the temporary and the target file names follow this pattern: 

Shell
 




xxxxxxxxxx
1


 
1
${BUCKET+PREFIX}/[field1=]value1[/[field2]=value2]/${TOPIC}/${PARTITION}/temp.${FORMAT}
2
 
          
3
${BUCKET+PREFIX}/[field1=]value1[/[field2]=value2]/${TOPIC}(${PARTITION}_${OFFSET}).${FORMAT}



Connector Configuration

The connector provides a set of configuration flags to allow different setups. The full details are available following the code repository on GitHub. Here are the full configuration settings specific to the connector:

Name

Description

Type

Values Set

connect.s3.kcql

SQL like syntax to describe the connector behaviour

string


aws.region

Region

string


aws.access.key

Access Key

string


aws.secret.key

Secret Key

string


aws.auth.mode

Auth Mode

string

Credentials, EC2, ECS, Env

aws.custom.endpoint

Custom Endpoint

string


aws.vhost.bucket

Enable Vhost Buckets

boolean

true, false


To simplify the connector configuration, the connector relies on a SQL like syntax (KCQL), to drive the behavior. You can have multiple statements to support reading from multiple [and different] topics within the same connector. 

The generic syntax supported is this (keywords order is important):

SQL
 




xxxxxxxxxx
1
10
9


 
1
INSERT INTO bucketAddress:pathPrefix 
2
SELECT *
3
FROM kafka-topic
4
[PARTITIONBY (partition[, partition] ...)]
5
[STOREAS storage_format]
6
[WITHPARTITIONER partitioner]
7
[WITH_FLUSH_SIZE flush_size]
8
[WITH_FLUSH_INTERVAL flush_interval]
9
[WITH_FLUSH_COUNT flush_count]



Here are a few examples:

SQL
 




xxxxxxxxxx
1
14


 
1
INSERT INTO testS3Bucket:pathToWriteTo 
2
SELECT * FROMtopicA
3
 
           
4
INSERT INTO testS3Bucket:pathToWriteTo 
5
SELECT * 
6
FROM topicA
7
STOREAS `AVRO`WITH_FLUSH_COUNT = 5000
8
 
           
9
INSERT INTO kafka-connect-aws-s3-test:headerpartitioningdem 
10
SELECT * 
11
FROM topic-with-headers
12
PARTITIONBY _header.facilityCountryCode,_header.facilityNum
13
STOREAS `TEXT`
14
WITH_FLUSH_COUNT=100 



Output format

Depending on requirements and input topic[-s] format the connector can store these data formats:

Flushing data

A critical aspect of writing to S3 is avoiding small files. The connector supports three configuration options to control how often the data is written to the target location. All of them can be used at once, and they are not mutually exclusive.

Using FLUSH_COUNT ensures the resulting files do not contain more records than the value provided. This strategy might still lead to small files being stored;it all depends on records size.

FLUSH_SIZE gives better control over the output file size. As soon as the file size exceeds the provided value, it will be written to the target location. It is not a hard limit though.

FLUSH_INTERVAL is always the fallback. After the given time interval, the files will be rolled over even if any of the other two strategies have not yet been met. or even worse, maybe there won’t ever be a record added to the topic - hypothetically speaking. Without this strategy, the records accumulated already will never be flushed.

Data Partition

The default behavior is to store the data for each topic-partition tuple. It means the target bucket will have entries for each partition for each topic processed. Here is how your S3 bucket structure will look.

Shell
 




xxxxxxxxxx
1


 
1
bucket/prefix/topic/partition/offset.ext



There is another way to partition the data in S3, and this is driven by the data content. PARTITIONBY statement controls the behavior. You can partition the data by

When using the custom partitioner there are two options that impact the resulting S3 path: Keys and Values or Values only [default]. The first option is driven by this KCQL statement: WITHPARTITIONER=KeysAndValues. In this case both the column(field) and the value contribute to the path:
Shell
 




xxxxxxxxxx
1


 
1
bucket/prefix/field1=value1[/field2=value2]/topic(partition_offset).ext 



Default behavior, which can be defined via  WITHPARTITIONER=Values, does not retain the column(or key) names in the output path:

Shell
 




xxxxxxxxxx
1


 
1
bucket/prefix/value1[/value2]/topic(partition_offset).ext



Conclusion

Using Open Source S3 Kafka Connector can help you meet cost reduction targets your project [or company] needs. 

Storing data in S3 is only part of the story. Sometimes, S3 data needs to be replayed too. In my next article I’ll cover the benefits of having a full circle Kafka-S3-Kafka.

 

 

 

 

Top