Kafka on Kubernetes, the Strimzi Way (Part 2)

We kicked off the the first part of the series by setting up a single node Kafka cluster which was accessible to only internal clients within the same Kubernetes cluster, had no encryption, authentication or authorization and used temporary persistence. We will keep iterating/improving on this during the course of this blog series.

This part will cover these topics:

The code is available on GitHub - https://github.com/abhirockzz/kafka-kubernetes-strimzi/

What Do I Need to Try This Out?

kubectl - https://kubernetes.io/docs/tasks/tools/install-kubectl/

I will be using Azure Kubernetes Service (AKS) to demonstrate the concepts, but by and large it is independent of the Kubernetes provider (e.g. feel free to use a local setup such as minikube). If you want to use AKS, all you need is a Microsoft Azure account which you can get for FREE if you don't have one already.

I will not be repeating some of the common sections (such as Installation/Setup for Helm, Strimzi, Azure Kubernetes Service as well as Strimzi overview) in this or subsequent part of this series and would request you to refer to part one for those details

Let's Create an Externally Accessible Kafka Cluster

To achieve this, we just need to tweak the Strimzi Kafka resource a little bit. I am highlighting the key part below - here is the original manifest from part 1

Java
 




x


 
1
spec:
2
  kafka:
3
    version: 2.4.0
4
    replicas: 1
5
    listeners:
6
      plain: {}
7
      external:
8
        type: loadbalancer
9
        tls: true



What changed?

To make Kafka accessible to external client applications, we added an external listener of type loadbalancer. Since we will exposing our application to the public Internet, we need additional layers of protection such as transport level (TLS/SSL encryption) and application level security (authentication and authorization). In this part, we will just configure encryption and explore the other aspects in another blog. To configure end-to-end TLS encryption, we add tls: true

tls: true config is actually used as a default, but I have added it explicitly for sake of clarity

To create the cluster:

Java
 




xxxxxxxxxx
1


 
1
kubectl apply -f https://github.com/abhirockzz/kafka-kubernetes-strimzi/raw/master/part-2/kafka.yaml



Kubernetes Magic!

The Strimzi Operator kicks into action and does all the heavy lifting for us:

I will be highlighting the resources created corresponding to the external listener and TLS encryption. For a walk through of ALL the resources which are created as part of the Kafka cluster, please refer to part 1

If you look for the Services, you will see something similar to this:

Java
 




xxxxxxxxxx
1


 
1
kubectl get svc
2
 
          
3
my-kafka-cluster-kafka-0                    LoadBalancer   10.0.162.98    40.119.233.2    9094:31860/TCP               60s
4
my-kafka-cluster-kafka-bootstrap            ClusterIP      10.0.200.20    <none>          9091/TCP,9092/TCP            60s
5
my-kafka-cluster-kafka-brokers              ClusterIP      None           <none>          9091/TCP,9092/TCP            60s
6
my-kafka-cluster-kafka-external-bootstrap   LoadBalancer   10.0.122.211   20.44.239.202   9094:32267/TCP               60s
7
my-kafka-cluster-zookeeper-client           ClusterIP      10.0.137.33    <none>          2181/TCP                     82s
8
my-kafka-cluster-zookeeper-nodes            ClusterIP      None           <none>          2181/TCP,2888/TCP,3888/TCP   82s



Notice the my-kafka-cluster-kafka-external-bootstrap Service of the type LoadBalancer? Since I am using Azure Kubernetes Service, this is powered by an Azure Load Balancer which has a public IP (20.44.239.202 in this example) and exposes Kafka to external clients over port 9094. You should be able to locate it using the Azure CLI (or the Azure portal if you prefer) by using the az network lb list command

Java
 




xxxxxxxxxx
1


 
1
export AKS_RESOURCE_GROUP=[replace with resource group name]
2
export AKS_CLUSTER_NAME=[replace with AKS cluster name]
3
export AKS_LOCATION=[replace with region e.g. southeastasia]
4
 
          
5
az network lb list -g MC_${AKS_RESOURCE_GROUP}_${AKS_CLUSTER_NAME}_${AKS_LOCATION}



What about the encryption part?

To figure that out, let's introspect the Kafka server configuration:

As explained in the previous blog, this is stored in a ConfigMap

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
kubectl get configmap/${CLUSTER_NAME}-kafka-config -o yaml



This is what the Common listener configuration in server.config reveals:

Java
 




xxxxxxxxxx
1


 
1
listeners=REPLICATION-9091://0.0.0.0:9091,PLAIN-9092://0.0.0.0:9092,EXTERNAL-9094://0.0.0.0:9094
2
advertised.listeners=REPLICATION-9091://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9091,PLAIN-9092://my-kafka-cluster-kafka-${STRIMZI_BROKER_ID}.my-kafka-cluster-kafka-brokers.default.svc:9092,EXTERNAL-9094://${STRIMZI_EXTERNAL_9094_ADVERTISED_HOSTNAME}:${STRIMZI_EXTERNAL_9094_ADVERTISED_PORT}
3
listener.security.protocol.map=REPLICATION-9091:SSL,PLAIN-9092:PLAINTEXT,EXTERNAL-9094:SSL



Notice that in addition to inter-broker replication (over port 9091) and un-encrypted internal (within Kubernetes cluster) client access over non TLS port 9092, appropriate listener config has been added for TLS encrypted access over port 9094

The Moment of Truth....

To confirm, let's try out a couple of client applications which will communicate with our freshly minted Kafka cluster on Kubernetes! We will produce and consume messages using the following:

Communication to our Kafka cluster has to be encrypted (non TLS client connections will be rejected). TLS/SSL implicitly implies one way authentication, where the client validates the Kafka broker identity. In order to do this, client applications need to trust the cluster CA certificate. Remember that the cluster CA certificate is stored in a Kubernetes Secret (refer to details in part 1). By default, these are auto-generated by Strimzi, but you can provide your own certificates as well (refer https://strimzi.io/docs/operators/master/using.html#kafka-listener-certificates-str)

Start by extracting the cluster CA certificate and password:

Java
 




xxxxxxxxxx
1


 
1
export CLUSTER_NAME=my-kafka-cluster
2
 
          
3
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt
4
kubectl get secret $CLUSTER_NAME-cluster-ca-cert -o jsonpath='{.data.ca\.password}' | base64 --decode > ca.password


You should have two files: ca.crt and ca.password. Feel free to check out their contents

While some Kafka clients (e.g. Confluent Go client) use the CA certificate directly, others (e.g. Java client, Kafka CLI etc.) require access to the CA certificate via a truststore. I am using the built-in truststore which comes in with a JDK (Java) installation - but this is just for convenience and you're free to use other options (such as creating your own)

Java
 




xxxxxxxxxx
1
15


 
1
export CERT_FILE_PATH=ca.crt
2
export CERT_PASSWORD_FILE_PATH=ca.password
3
 
          
4
# replace this with the path to your truststore
5
 
          
6
export KEYSTORE_LOCATION=/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib/security/cacerts
7
export PASSWORD=`cat $CERT_PASSWORD_FILE_PATH`
8
export CA_CERT_ALIAS=strimzi-kafka-cert
9
 
          
10
# you will prompted for the truststore password. for JDK truststore, the default password is "changeit"
11
# Type yes in response to the 'Trust this certificate? [no]:' prompt
12
 
          
13
sudo keytool -importcert -alias $CA_CERT_ALIAS -file $CERT_FILE_PATH -keystore $KEYSTORE_LOCATION -keypass $PASSWORD
14
 
          
15
sudo keytool -list -alias $CA_CERT_ALIAS -keystore $KEYSTORE_LOCATION



That's it for the base setup - you are ready to try out the Kafka CLI client!

Please note that the configuration steps for the Kafka CLI as detailed below will also work for the Java clients as well - give it a try!

Extract the LoadBalancer public IP for Kafka cluster

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_CLUSTER_NAME=my-kafka-cluster
2
 
          
3
kubectl get service/${KAFKA_CLUSTER_NAME}-kafka-external-bootstrap --output=jsonpath={.status.loadBalancer.ingress[0].ip}



Create a file called client-ssl.properties with the following contents:

Java
 




xxxxxxxxxx
1


 
1
bootstrap.servers=[LOADBALANCER_PUBLIC_IP]:9094
2
security.protocol=SSL
3
ssl.truststore.location=[TRUSTSTORE_LOCATION]
4
//for JDK truststore, the default password is "changeit"
5
ssl.truststore.password=changeit



To use the Kafka CLI, download Kafka if you don't have it already - https://kafka.apache.org/downloads

All you need to do is use the kafka-console-producer and kafka-console-consumer by pointing it to the client-ssl.properties file you just created

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_HOME=[replace with Kafka installation path] e.g. /Users/foobar/kafka_2.12-2.3.0
2
export LOADBALANCER_PUBLIC_IP=[replace with public IP of Load Balancer]
3
export TOPIC_NAME=test-strimzi-topic
4
 
          
5
# on a terminal, start producer and send a few messages
6
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --producer.config client-ssl.properties
7
 
          
8
# on another terminal, start consumer
9
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $LOADBALANCER_PUBLIC_IP:9094 --topic $TOPIC_NAME --consumer.config client-ssl.properties --from-beginning



You should see producer and consumer working in tandem. Great!

If you face SSL Handshake errors, please check whether the CA cert has been correctly imported along with its correct password. If the Kafka cluster is not reachable, ensure you are using the right value for the public IP

Now, let's try a programmatic client. Since the Java client behavior (required config properties) are same as the CLI, I am using a Go client to try something different. Don't worry, if you are not a Go programmer, it should be easy to follow along - I will not walk through the entire program, just the part where we create the connection related configuration.

Here is the snippet:

Java
 




xxxxxxxxxx
1


 
1
    bootstrapServers = os.Getenv("KAFKA_BOOTSTRAP_SERVERS")
2
    caLocation = os.Getenv("CA_CERT_LOCATION")
3
    topic = os.Getenv("KAFKA_TOPIC")
4
 
          
5
    config := &kafka.ConfigMap{"bootstrap.servers": bootstrapServers, "security.protocol": "SSL", "ssl.ca.location": caLocation}



Notice that the bootstrap.servers and security.protocol are the same as ones you used in the Kafka CLI client (same for Java as well). The only difference is that ssl.ca.location is used to point to the CA certificate directly as opposed to a truststore

If you have Go installed, you can try it out. Clone the Git repo...

Java
 




xxxxxxxxxx
1


 
1
git clone https://github.com/abhirockzz/kafka-kubernetes-strimzi
2
cd part-2/go-client-app



.. and run the program:

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_BOOTSTRAP_SERVERS=[replace with loadbalancer_ip:9094] e.g. 42.42.424.424:9094
2
export CA_CERT_LOCATION=[replace with path to ca.crt file which you downloaded]
3
export KAFKA_TOPIC=test-strimzi-topic
4
 
          
5
go run kafka-client.go



You should see logs similar to this and confirm that messages are being produced and consumed

press ctrl+c to exit the app

Java
 




xxxxxxxxxx
1
11


 
1
started consumer
2
started producer delivery goroutine
3
started producer goroutine
4
delivered messaged test-strimzi-topic[0]@122
5
delivered messaged test-strimzi-topic[0]@123
6
delivered messaged test-strimzi-topic[0]@124
7
received message from test-strimzi-topic[0]@122: value-2020-06-08 16:23:05.913303 +0530 IST m=+0.020529419
8
received message from test-strimzi-topic[0]@123: value-2020-06-08 16:23:07.915252 +0530 IST m=+2.022455867
9
received message from test-strimzi-topic[0]@124: value-2020-06-08 16:23:09.915875 +0530 IST m=+4.023055601
10
received message from test-strimzi-topic[0]@125: value-2020-06-08 16:23:11.915977 +0530 IST m=+6.023134961
11
....


That's All for Now, But There Is More to Come!

So we made some progress! We now have a Kafka cluster on Kubernetes which is publicly accessible but is (partially) secure thanks to TLS encryption. We also did some sanity testing using not one, but two (different) client applications. In the next part, we'll improve this further...stay tuned!

 

 

 

 

Top