Kafka SSL Client Authentication in Multi-Tenancy Architecture
Apache Kafka is the key product for not only messaging transformations but also real-time data processing, in addition to many other use cases. Architectures hosted inside the cloud claim to be secure in terms of communication and providing general security. But when it comes to the multiple client/consumer communication from a server/producer, Kafka provides in-built support for SSL as well as user-based authentication. In the below article, we will set up such an authentication mechanism step-by-step.
The solution is divided into three parts:
- SSL support for one or more brokers: Generate the key and the certificate for each machine in the cluster. You can use Java's KeyTool utility to accomplish this task. We will generate the key into a temporary KeyStore initially so that we can export and sign it later with CA.
- Kafka Configurations (We used Kafka 2.11-2.3.0).
- Running the whole set up.
Instructions to Install This Use Case
SSL support for one or more brokers. We will use Java's key tool utility to accomplish this task. We will generate the key into a temporary KeyStore initially so that we can export and sign it later with CA.
We are going to use one Kafka server and two clients (consumers). Also, here, we are using self-signed certificates. Otherwise, we need to have TrustStore and KeyStore JKSs for each server.
Points to note:
- Please create a folder for creating and keeping all cert files.
- Please provide identical details and passwords for all. In my case I have used:
xxxxxxxxxx 
          issuer=C = de, ST = RF, L = Mainz, O = Technaura, OU = consulting, CN = swarnava.c, emailAddress =  
          Only the CN for client2 I have given a different user for testing purposes. Please generate your certificate carefully, else there will be a problem in the next part.
xxxxxxxxxx 
          keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA 
          openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 
          keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert  
          keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert 
          keytool -keystore kafka.client2.truststore.jks -alias CARoot -import -file ca-cert 
          keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file 
          openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial  
          keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert 
          keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed 
          keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA 
          keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file 
          openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial 
          keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert 
          keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed 
          keytool -keystore kafka.client2.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA 
          keytool -keystore kafka.client2.keystore.jks -alias localhost -certreq -file cert-file 
          openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial  
          keytool -keystore kafka.client2.keystore.jks -alias CARoot -import -file ca-cert 
          keytool -keystore kafka.client2.keystore.jks -alias localhost -import -file cert-signed 
          Once everything is generated, you can see the generated files:
xxxxxxxxxx 
          λ ls 
          ca-cert ca-key cert-signed kafka.client1.truststore.jks kafka.client2.truststore.jks kafka.server.truststore.jks ca-cert.srl cert-file kafka.client1.keystore.jks kafka.client2.keystore.jks kafka.server.keystore.jks 
          Kafka Configuration
Chang the server.properties file with below lines:
xxxxxxxxxx 
          listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093 
          
Also add:
xxxxxxxxxx 
          ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 
          ssl.endpoint.identification.algorithm= 
          ssl.keymanager.algorithm=SunX509 
          ssl.keystore.location=<p>/kafka.server.keystore.jks 
          ssl.keystore.password=<p> 
          ssl.keystore.type=JKS 
          ssl.protocol=TLS 
          ssl.trustmanager.algorithm=PKIX 
          ssl.truststore.location=<p>/kafka.server.truststore.jks 
          ssl.truststore.password=<p> 
          ssl.truststore.type=JKS 
          authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 
          allow.everyone.if.no.acl.found=true 
          ssl.client.auth=required 
          Then, we can create the necessary new files: client-ssl.properties, client-ssl1.properties, and client-ssl2.properties inside kafka_2.11-2.3.0\config.
xxxxxxxxxx 
          security.protocol=SSL 
          ssl.truststore.location=<p>/kafka.client1.truststore.jks 
          ssl.truststore.password=<p> 
          ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 
          ssl.truststore.type=JKS 
          ssl.endpoint.identification.algorithm= 
          ssl.keystore.location=<p>/kafka.client1.keystore.jks 
          ssl.keystore.password=<p> 
          ssl.key.password=<p> 
          xxxxxxxxxx 
          security.protocol=SSL 
          ssl.truststore.location=<p>/kafka.client1.truststore.jks 
          ssl.truststore.password=<p> 
          ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 
          ssl.truststore.type=JKS 
          ssl.endpoint.identification.algorithm= 
          ssl.keystore.location=<p>/kafka.client1.keystore.jks 
          ssl.keystore.password=<p> 
          ssl.key.password=<p> 
          xxxxxxxxxx 
          security.protocol=SSL 
          ssl.truststore.location=<p>/kafka.client2.truststore.jks 
          ssl.truststore.password=<p> 
          ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 
          ssl.truststore.type=JKS 
          ssl.endpoint.identification.algorithm= 
          ssl.keystore.location=<p>/kafka.client2.keystore.jks 
          ssl.keystore.password=<p> 
          ssl.key.password=<p> 
          
First, run Kafka and ZooKeeper:
xxxxxxxxxx 
           Zkserver 
           .\bin\windows\kafka-server-start.bat .\config\server.properties 
          
Then, open a new terminal and create a new topic:
xxxxxxxxxx 
          .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --topic test1 --partitions 1 --replication-factor 1 
          .\bin\windows\kafka-topics.bat --list --zookeeper localhost:2181 
          
After this, check the created certificate:
xxxxxxxxxx 
          openssl s_client -debug -connect localhost:9093 -tls1 
          It will return the following details at the end. It means your certificate is generated properly.
xxxxxxxxxx 
          Wtmz24ChQdgNcygKXLq1AHgDetoHz57hrx5f75/gh31nDdgHpv4xKyO40TSIH+8v 
          PqgvvrogH0lgLCwsJfqwPEJbWZjL6pvLsBfPB8NMICMXpL50ZA== 
          -----END CERTIFICATE----- 
          subject=C = de, ST = RF, L = Mainz, O = Technaura, OU = consulting, CN = swarnava.c 
          issuer=C = de, ST = RF, L = Mainz, O = Technaura, OU = consulting, CN = swarnava.c, emailAddress = swarnava.c@technaura.com 
           --- 
          Acceptable client certificate CA names 
          C = de, ST = RF, L = Mainz, O = Technaura, OU = consulting, CN = swarnava.c, emailAddress = swarnava.c@technaura.com 
          Client Certificate Types: ECDSA sign, RSA sign, DSA sign 
          We will run the set up for three different scenarios, i.e. without authentication, only server-side authentication, server, and client-side authentication.
Running the Whole Setup
The command for producing using console producer:
.\bin\windows\kafka-console-producer.bat --broker-list <broker host:port> --topic <topic-name> --producer.config config\<config file>
The command for consuming using console consumer:
.\bin\windows\kafka-console-consumer.bat --bootstrap-server <server host:port> --topic <topic-name> --consumer.config config\<config file>
Without Authentication
Producer

Consumer

Only Server-Side Authentication
Created another topic "test2".
Producer

Consumer 1

Consumer 2

Note: Please check the used config files.
Server and Client-Side Authentication
For authorization of topic – ‘test2’ only for the user – swarnava.c, use the two following commands:
xxxxxxxxxx 
          .\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=swarnava.c,OU=consulting,O=Technaura,L=Mainz,ST=RF,C=de" --cluster --producer --topic test2 
          xxxxxxxxxx 
          .\bin\windows\kafka-acls.bat --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:"CN=swarnava.c,OU=consulting,O=Technaura,L=Mainz,ST=RF,C=de" --group=* --consumer --topic test2 
          
Producer

Consumer 1 (with User – swarnava.c)

Consumer 2 (without User – swarnava.c)

Congrats. You are done. This is the complete implementation of the SSL implementation in Kafka.



