Welcome to day 1 of the OVO tech blog advent calendar 2020!
For 24 days we will publish daily posts on different topics related to tech, product, ux and accessibility.
Why would you want to move the Kafka offset?
When working with computers and software one thing is certain, at some point things will go wrong. It's the same when writing Kafka consumers. If you haven’t come across them, Kafka consumers pick up messages from Kafka topics which are lists of messages all streaming onto a specific feed.
When you have issues picking up those messages, you have different options:
- you can create a separated topic for messages that failed
- you can block processing on the last failed message, so when you fix your application, it will be able to process messages correctly
- you can log an error and carry on with remaining messages
Every approach has its pros and cons. In our case we decided to use the last one. If you process messages representing the current state of an electric vehicle charger and it fails, it's not a problem, as we will soon have another update. Some topics on the other hand have a slow-moving data, for example successful new charger installations, we have to process them all successfully. If an error happens in our service, we move an offset to the past value, for processing all messages since a failure.
In this post I will show you how to use the tooling provided by Kafka, to check the current offset values and move it back in time.
Getting started
The first thing to do is to find which Kafka version you are using. In my case it's 2.6, go to https://kafka.apache.org/downloads and download the correct version, I'm getting kafka_2.13-2.6.0.tgz for this article.
Once you download it and extracted you will see inside different directories, all we need you will find in the bin directory.
You can run Kafka locally and test moving an offset before you connect to your environment, where you want to move an offset. As always test first on an other environment than production, especially when you are doing it for the first time.
Kafka comes with configured properties, so you can run it immediately after downloading.
Start Zookeeper ./zookeeper-server-start.sh ../config/zookeeper.properties
Now you can start Kafka ./kafka-server-start.sh ../config/server.properties
You will see a lot of logs but hopefully at the end you will see INFO [KafkaServer id=0] started (kafka.server.KafkaServer).
If you run this command, it will list all topics ./kafka-topics.sh --list --bootstrap-server localhost:9092
If you haven't created any topics before, it should give you an empty response.
You can create a topic with
./kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 --partitions 1 \
--topic demo
You can run again the command to list all topics and you will see the demo topic on the list.
Run in one terminal tab a producer
./kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic demo
You can send few messages, enter sends a message. I'm sending 3 messages
>1
>2
>3
Now you can run a consumer in other tab (don't stop it)
./kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--group demo-consumer \
--topic demo \
--from-beginning
If everything worked correctly to this point, you should see all messages consumed. Now we can look at the description of the consumer group with this command
./kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group demo-consumer \
--describe
This is the output I can see locally. Your consumer id will have a different value, but everything else should look the same. If you stop the consumer and start it again the consumer id will change.
GROUP TOPIC PARTITION CURRENT-OFFSET
demo-consumer demo 0 3
LOG-END-OFFSET LAG CONSUMER-ID
3 0 consumer-demo-consumer-1-3b863120-079c-4afb-ace9-6fa1a690399b
HOST CLIENT-ID
/127.0.0.1 consumer-demo-consumer-1
Now if we want to move the offset, we can run first a dry-run which will let us know what will change. You can specify a partition by providing a topic name with :partition number
in our case demo:0
. Because the topic has only one partition we could only provide --topic demo
, you may as well want to move offsets on all partitions to a specific date or earliest/latest, then you wouldn't specify a partition.
./kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group demo-consumer \
--topic demo:0 \
--reset-offsets \
--to-offset 2 \
--dry-run
If your consumer is still running you will get this error.
Error: Assignments can only be reset if the group 'demo-consumer' is inactive, but the current state is Stable.
You have to stop the consumer, otherwise it would be possible for you to move the offset while consumer commits last message and ignore your change. Stop the consumer and run describe again
./kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group demo-consumer \
--describe
You should see a message Consumer group 'demo-consumer' has no active members.
and the fields CONSUMER-ID, HOST, CLIENT-ID
should be empty.
Run a dry-run command again. This time you should see what will be the outcome if you move the offset.
GROUP TOPIC PARTITION NEW-OFFSET
demo-consumer demo 0 2
Now all we have to do is to execute the change.
./kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--group demo-consumer \
--topic demo:0 \
--reset-offsets \
--to-offset 2 \
--execute
If you describe the consumer group now, you will see that its current offset is 2 and the lag is 1. Run the consumer again and it will consume the last message.
Running with authentication
So far we only moved an offset locally, which is a lot easier when you don't have to authenticate to your Kafka. In this section we will look how to use the same scripts with authentication. In my case our Kafka runs on Aiven, but as long as you can get the access certificate, the key, and the CA certificate you will be able to repeat these steps.
First download the CA.pem, and a user's certificate and key. Make sure a user has access to the topic, in my case because I want to produce and consumer from a topic, it's a user with read/write access to the demo topic. If you are using Aiven look at the pictures where to find them. I copy them into dev directory inside bin with all the Kafka scripts for simplicity, but you can keep them somewhere else.



Now inside the dev directory create a bash script to generate jks files generate_jks.sh
.
#!/bin/bash
NAME=$1
openssl pkcs12 -export -inkey service.key -in service.cert -out ${NAME}.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore ${NAME}.truststore.jks
You will need a password for both files which we will need later, so keep it. You have to trust the certificate when you get a question.
./generate_jks.sh dev
After running this command you should see two new files dev.truststore.jks
and dev.keystore.p12
.
Create a config file. Make sure you put your password instead.
security.protocol=SSL
ssl.truststore.location=./dev/dev.truststore.jks
ssl.truststore.password=password
ssl.keystore.location=./dev/dev.keystore.p12
ssl.keystore.password=password
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.type=JKS
ssl.keystore.type=PKCS12
We can connect with a consumer and a producer to our topic in dev environment, replace host:port
with your service URL.
./kafka-console-consumer.sh \
--bootstrap-server host:port \
--group demo-consumer \
--consumer.config ./dev/config \
--topic demo \
--from-beginning
./kafka-console-producer.sh \
--broker-list host:port \
--producer.config ./dev/config \
--topic demo
Publish few messages. And stop the consumer after. Describe the topic and move the offset with following commands.
./kafka-consumer-groups.sh \
--bootstrap-server host:port \
--command-config ./dev/config \
--group demo-consumer \
--describe
./kafka-consumer-groups.sh \
--bootstrap-server host:port \
--command-config ./dev/config \
--group demo-consumer \
--topic demo \
--reset-offsets \
--to-earliest \
--dry-run
./kafka-consumer-groups.sh \
--bootstrap-server host:port \
--command-config ./dev/config \
--group demo-consumer \
--topic demo \
--reset-offsets \
--to-earliest \
--execute
If you want to move to a specific date, you can use --to-datetime '2020-11-01T00:00:00.000'
instead of --to-earliest
.
./kafka-consumer-groups.sh \
--bootstrap-server host:port \
--command-config ./dev/config \
--group demo-consumer \
--topic demo \
--reset-offsets \
--to-datetime '2020-12-01T00:00:00.000' \
--execute
Summary
In this post we walked through moving offset when running Kafka locally without any authentication and when connecting to Kafka with the SSL. We explored how to check what will change as a result of moving an offset and what flags we can use to move to different offsets. We as well used a console producer and a consumer provided with Kafka to simulate applications. Hopefully now you are prepared when you have to do it for your consumers.