Getting started with Kafka using .NET Core 3.1

Apache Kafka is a distributed streaming platform that lets you build applications using a publish/subscribe model for messaging between services. Kafka is a powerful tool for building a communication infrastructure between applications and enabling microservice architectures.

In this post, I’m going to walk through building a simple producer and consumer using .NET Core.

The source code for the producer and consumer created in this blog post can be found on GitHub in the DotnetCoreKafkaExample Repository.

Install and Run Kafka

First, we need to install and run Kafka Zookeeper and Server. In this example, we’re going to have a very simple Kafka deployment, with a single Zookeeper and Server running locally, using the default configurations. Detailed steps for doing this can be found in the Kafka Quickstart, so I won’t go into too much detail, but here is what you need to do (most of this section is summarized from the Kafka Quickstart)

Download and extract the Kafka release (2.4.0 at the time of this post):

$ tar -xzf kafka_2.12-2.4.0.tgz
$ cd kafka_2.12-2.4.0

Start Zookeeper:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Open another terminal window, cd to kafka_2.12-2.4.0 and start the Kafka Server:

$ bin/kafka-server-start.sh config/server.properties

Now you should have two terminal windows, one with Zookeeper running and another with Server running. In production, you would run these as a daemon using systemd, but for local development, it is easier to just run them in a terminal window.

Next, open another terminal window, cd to kafka_2.12-2.4.0, and create a topic called test:

$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

In development, we can leave replication-factor as 1 (we only have one server after all) and partitions as 1, because we are only going to have one consumer. But in production, you would likely increase replication-factor and partitions based on how many servers and consumers you have running in order to achieve fault tolerance and scalability, but that is beyond the scope of this blog post.

Now run a command to verify that the test topic has been created:

$ bin/kafka-topics.sh --list --bootstrap-server localhost:9092

You should see a single line output with your topic name, test.

If you want, you can use the kafka-console-producer and kafka-consoler-consumer scripts in Kafka’s bin directory to produce and consumer some messages.

Keep Kafka running and proceed to the next section where we will create a producer using .NET.

Create a Kafka Producer using .NET Core

Next, let’s create a simple Console application that produces messages to our Kafka test topic. I’m going to do this from the command line, but you can use Visual Studio, Rider, or whatever IDE you want.

$ mkdir KafkaProducer
$ cd KafkaProducer
$ dotnet new console

Now add the Confluent.Kafka Nuget package, which provides a library for using Kafka from .NET.

$ dotnet add package Confluent.Kafka

Now, edit Program.cs to create a producer that will send a message to Kafka every 5 seconds until we kill it with Ctrl+C:

using System;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;

namespace KafkaProducer
{
    class Program
    {
        public static async Task Main()
        {
            var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            
            // Create a producer that can be used to send messages to kafka that have no key and a value of type string 
            using var p = new ProducerBuilder<Null, string>(config).Build();

            var i = 0;
            while (true)
            {
                // Construct the message to send (generic type must match what was used above when creating the producer)
                var message = new Message<Null, string>
                {
                    Value = $"Message #{++i}"
                };

                // Send the message to our test topic in Kafka                
                var dr = await p.ProduceAsync("test", message);
                Console.WriteLine($"Produced message '{dr.Value}' to topic {dr.Topic}, partition {dr.Partition}, offset {dr.Offset}");
                
                Thread.Sleep(5000);
            }
        }
    }
}

To keep things simple, I have omitted error handling, but in a production app, you’ll want a try/catch around the ProduceAsync call to handle any potential runtime exceptions.

Now let’s run the producer to make sure it works (or build and run if you’re using an IDE):

$ dotnet run -p KafkaProducer.csproj

Let it produce a few messages and then press Ctrl+C to kill it. Your output should look something like this:

Produced message 'Message #1' to topic test, partition [0], offset 0
Produced message 'Message #2' to topic test, partition [0], offset 1
Produced message 'Message #3' to topic test, partition [0], offset 2

Create a Kafka Consumer using .NET Core

Now let’s create another project to consume messages from our test topic.

Like our producer, this too will be a console app:

$ cd ..
$ mkdir KafkaConsumer
$ cd KafkaConsumer
$ dotnet new console

And we will be using Confluent.Kafka again, so add that Nuget package:

$ dotnet add package Confluent.Kafka

Next, edit Program.cs to implement the consumer:

using System;
using System.Threading;
using Confluent.Kafka;

namespace KafkaConsumer
{
    class Program
    {
        static void Main()
        {
            var conf = new ConsumerConfig
            { 
                GroupId = "test-consumer-group",
                BootstrapServers = "localhost:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
           };

            using var c = new ConsumerBuilder<Ignore, string>(conf).Build();
            c.Subscribe("test");

            // Because Consume is a blocking call, we want to capture Ctrl+C and use a cancellation token to get out of our while loop and close the consumer gracefully.
            var cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    // Consume a message from the test topic. Pass in a cancellation token so we can break out of our loop when Ctrl+C is pressed
                    var cr = c.Consume(cts.Token);
                    Console.WriteLine($"Consumed message '{cr.Value}' from topic {cr.Topic}, partition {cr.Partition}, offset {cr.Offset}");

                    // Do something interesting with the message you consumed
                }
            }
            catch (OperationCanceledException)
            {
            }
            finally
            {
                c.Close();
            }
        }
    }
}

Now run the consumer:

$ dotnet run -p KafkaConsumer.csproj

and it should consume the messages your KafkaProducer produced earlier. For example:

Consumed message 'Message #1' from topic test, partition [0], offset 0
Consumed message 'Message #2' from topic test, partition [0], offset 1
Consumed message 'Message #3' from topic test, partition [0], offset 2

A Quick Aside About GroupId

While constructing the ConsumerConfig, we set GroupId = "test-consumer-group". What does this do?

Kafka works sort of like a queue in many respects, but it differs from a queue in that messages are not removed after you read them.

Each consumer has a GroupId, and multiple consumers can have the same GroupId. Kafka will return a message to only one consumer from a given GroupId. This prevents multiple consumers from having to worry about accidentally processing the same message as another consumer in the same group.

Alternatively, using different GroupIds for different consumers allows those consumers to operate independently on the same message.

One final note about message deletion… I mentioned above that Kafka does not remove a message when it is consumed, but Kafka topics do have a retention period, after which messages will be removed, so they are not stored indefinitely. The default retention policy for newly created topics is configured in the server.properties file when you start the Kafka server process.

Bringing it all together

So with the KafkaConsumer still running, start back up the KafkaProducer, and you should see messages being produced and consumed.

Just for fun, you could start up another consumer and you will see that only one consumer processes each message because they have the same GroupId. If you give each consumer a different GroupId, you will see them both process the same messages.

Where to go from here

This post walked you through creating a simple Kafka producer and consumer using .NET Core.

For more information about using Kafka with .NET Core, check out the confluent-kafka-dotnet GitHub repo which has more examples, and tips for how to optimize a producer for high volume scenarios (basically you will want to produce messages in a group and then flush them to Kafka periodically instead of one-by-one).

For more information on how to configure Kafka itself, check out the Apache Kafka Documentation.