Apache Kafka Introduction, Installation, and Implementation Using .NET Core 6

We will go over Apache Kafka basics, installation, and operation, as well as a step-by-step implementation using a .NET Core 6 web application.

Prerequisites

Agenda

Overview of Event Streaming

Events are the things that happen within our application when we navigate something. For example, we sign up on any website and order something, so, these are the events.

Event Streaming Platform

The event streaming platform records different types of data like transaction, historical, and real-time data. This platform is also used to process events and allow different consumers to process results immediately and in a timely manner.

An event-driven platform allows us to monitor our business and real-time data from different types of devices like IoT and many more. After analyzing, it provides a good customer experience based on different types of events and needs.

Introduction to Apache Kafka

Below, are a few bullet points that describe Apache Kafka:

Kafka Architecture

Main Concepts and Foundation of Kafka

1. Event

An event or record is the message that we read and write to the Kafka server; we do this in the form of events in our business world, and it contains a key, a value, a timestamp, and other metadata headers. The key, value, and time stamp, in this case, are as follows:

2. Producer

The producer is a client application that sends messages to the Kafka node or broker.

3. Consumer

The consumer is an application that receives data from Kafka.

4. Kafka Cluster

The Kafka cluster is the set of computers that share the workload with each other with varying purposes. 

5. Broker

The broker is a Kafka server that acts as an agent between the producer and consumer, who communicate via the broker.

6. Topic

The events are stored inside the “topic,” it’s similar to our folder in which we store multiple files.

Kafka Broker

Each topic has one or more producers and consumers, which write and reads data from the topic. Events in “topic” can be read as often as needed because it persists events and it’s not like another messaging system that removes messages after consuming.

7. Partitions

Topics are partitions, meaning the topic is spread over multiple partitions that we created inside the topic. When the producer sends some event to the topic, it will store it inside the particular partitions, and then, the consumer can read the event from the corresponding topic partition in sequence.

8. Offset

Kafka assigns one unique ID to the message stored inside the topic partition when the message arrives from the producer.

9. Consumer Groups

In the Kafka world, the consumer group acts as a single logical unit.

10. Replica

In Kafka, to make data fault-tolerant and highly available, we can replicate topics in different regions and brokers. So, in case something wrong happens with data in one topic, we can easily get that from another to replicate the same.

Different Kafka APIs

Kafka has five core APIs that serve different purposes:

  1. Admin API: This API manages different topics, brokers, and Kafka objects.
  2. Producer API: This API is used to write/publish events to different Kafka topics.
  3. Consumer API: This API is used to receive the different messages corresponding to the topics that are subscribed by the consumer.
  4. Kafka Stream API: This API is used to perform different types of operations like windowing, joins, aggregation, and many others. Basically, its use is to transform objects.
  5. Kafka Connect API: This API works as a connector to Kafka, which helps different systems connect with Kafka easily. It has different types of ready-to-use connectors related to Kafka.

Use Cases of Apache Kafka

  1. Messaging
  2. User activity tracking
  3. Log aggregation
  4. Stream processing
  5. Realtime data analytics

Installation of Kafka on Windows 10

Step 1

Download and install the Java SDK of version 8 or more. 

Note: I have Java 11, that’s why I put the same path in all commands that I used here.

Step 2

Open and install EXE.

Step 3

Set the environment variable for Java using the command prompt as admin.

Command:

Step 4

After that, download and install Apache Kafka.

Step 5

Extract the downloaded Kafka file and rename it “Kafka.”

Step 6

Open D:\Kafka\config\ and create a “zookeeper-data” and “kafka-logs” folder inside that.

Step 7

Next, open D:\Kafka\config\zookeeper.properties file and add the folder path inside that:

Step 8

After that, open D:\Kafka\config\server.properties file and change the log path over there:

Step 9

Saves and close both files.

Step 10

Run ZooKeeper:

Run ZooKeeper

Step 11

Start Kafka:

Start Kafka

Step 12

Create Kafka topic:

Create Kafka Topic

Step 13

Create a producer and send some messages after you’ve started a producer and consumer:

Producer-Consumer

Step 14

Next, create a consumer. After, you will see the message the producer sent:

Message

Step-by-Step Implementation

Let’s start with practical implementation.

Step 1

Create a new .NET Core Producer Web API:

Create A New Project

Step 2

Configure your application:

Configure Application

Step 3

Provide additional details:

Additional Info

Step 4

Install the following two NuGet packages:

NuGet Packages

Step 5

Add configuration details inside the appsettings.json file:

JSON
 
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "producerconfiguration": {
    "bootstrapservers": "localhost:9092"
  },
  "TopicName": "testdata"
}


Step 6

Register a few services inside the “Program” class:

C#
 
using Confluent.Kafka;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
var producerConfiguration = new ProducerConfig();
builder.Configuration.Bind("producerconfiguration", producerConfiguration);

builder.Services.AddSingleton<ProducerConfig>(producerConfiguration);

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();


Step 7

Next, create the CarDetails model class:

C#
 
using Microsoft.AspNetCore.Authentication;

namespace ProducerApplication.Models
{
    public class CarDetails
    {
        public int CarId { get; set; }
        public string CarName { get; set; }
        public string BookingStatus { get; set; }
    }
}


Step 8

Now, create the CarsController class:

C#
 
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Newtonsoft.Json;
using ProducerApplication.Models;

namespace ProducerApplication.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class CarsController : ControllerBase
    {
        private ProducerConfig _configuration;
        private readonly IConfiguration _config;
        public CarsController(ProducerConfig configuration, IConfiguration config)
        {
            _configuration = configuration;
            _config = config;
        }
        [HttpPost("sendBookingDetails")]
        public async Task<ActionResult> Get([FromBody] CarDetails employee)
        {
            string serializedData = JsonConvert.SerializeObject(employee);

            var topic = _config.GetSection("TopicName").Value;

            using (var producer = new ProducerBuilder<Null, string>(_configuration).Build())
            {
                await producer.ProduceAsync(topic, new Message<Null, string> { Value = serializedData });
                producer.Flush(TimeSpan.FromSeconds(10));
                return Ok(true);
            }
        }
    }
}


Step 9

Finally, run the application and send a message:

Run and Send

Step 10

Now, create a “consumer” application:

For that, create a new .NET Core console application:

Create A New Project 2

Step 11

Configure your application:

Configure

Step 12

Provide additional information:

Additional Info

Step 13

Install the NuGet below: 

NuGet Install 2

Step 14

Add the following code, which consumes messages sent by the consumer:

C#
 
using Confluent.Kafka;

var config = new ConsumerConfig
{
    GroupId = "gid-consumers",
    BootstrapServers = "localhost:9092"
};

using (var consumer = new ConsumerBuilder<Null, string>(config).Build())
{
    consumer.Subscribe("testdata");
    while (true)
    {
        var bookingDetails = consumer.Consume();
        Console.WriteLine(bookingDetails.Message.Value);
    }
}


Step 15

Finally, run the producer and consumer, send a message using the producer app, and you will see the message immediately inside the consumer console sent by the producer:

Sent Message

Here is the GitHub URL I used in this article. 

Conclusion

Here, we discussed Apache Kafka introduction, working, benefits, and step-by-step implementation using .NET Core 6.

Happy Coding!

 

 

 

 

Top