Reset offset of Kafka consumer group.
Preparations
Create sample keep-up-with-topic
topic.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic keep-up-with-topic --replication-factor 3 --partitions 5
Created topic keep-up-with-topic.
Display topic details.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic keep-up-with-topic
Topic: keep-up-with-topic TopicId: oBWzsfSAQo-63ym7h2Slmw PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: keep-up-with-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: keep-up-with-topic Partition: 1 Leader: 2 Replicas: 2,3,5 Isr: 2,3,5 Topic: keep-up-with-topic Partition: 2 Leader: 3 Replicas: 3,5,4 Isr: 3,5,4 Topic: keep-up-with-topic Partition: 3 Leader: 5 Replicas: 5,4,1 Isr: 5,4,1 Topic: keep-up-with-topic Partition: 4 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2
Publish several messages.
$ echo "hello_msg: {'who':'$(whoami)','where':'$(hostname)','when':'$(date)'}" | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --property parse.key=true --property key.separator=:
Consume these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624655927380 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:46 UTC 2021'} CreateTime:1624655930318 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:49 UTC 2021'} CreateTime:1624656092851 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:21:31 UTC 2021'} CreateTime:1624656142408 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:21 UTC 2021'} CreateTime:1624656149136 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:27 UTC 2021'} CreateTime:1624656159306 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:38 UTC 2021'} CreateTime:1624656197636 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:16 UTC 2021'} CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 9 messages
Get the offset
Get the offset for the latest timestamp.
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic keep-up-with-topic --time -1
keep-up-with-topic:0:0 keep-up-with-topic:1:0 keep-up-with-topic:2:0 keep-up-with-topic:3:9 keep-up-with-topic:4:0
Get the offset for the earliest timestamp.
kafka@kafka5:~/kafka$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic keep-up-with-topic --time -2
keep-up-with-topic:0:0 keep-up-with-topic:1:0 keep-up-with-topic:2:0 keep-up-with-topic:3:0 keep-up-with-topic:4:0
Get the offset for the specific timestamp.
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic keep-up-with-topic --time 1624656214765
keep-up-with-topic:0: keep-up-with-topic:1: keep-up-with-topic:2: keep-up-with-topic:3:7 keep-up-with-topic:4:
Change the offset
Disconnect the consumer as it needs to be inactive to perform the assignment.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-console-client --topic keep-up-with-topic --reset-offsets --to-earliest
Error: Assignments can only be reset if the group 'my-console-client' is inactive, but the current state is Stable. GROUP TOPIC PARTITION NEW-OFFSET
The consumer group should not have any active members.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-console-client
Consumer group 'my-console-client' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-console-client keep-up-with-topic 4 0 0 0 - - - my-console-client keep-up-with-topic 2 0 0 0 - - - my-console-client keep-up-with-topic 3 9 9 0 - - - my-console-client keep-up-with-topic 0 0 0 0 - - - my-console-client keep-up-with-topic 1 0 0 0 - - -
Change offset to the earliest one.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-console-client --topic keep-up-with-topic --reset-offsets --to-earliest --execute
GROUP TOPIC PARTITION NEW-OFFSET my-console-client keep-up-with-topic 3 0 my-console-client keep-up-with-topic 2 0 my-console-client keep-up-with-topic 0 0 my-console-client keep-up-with-topic 4 0 my-console-client keep-up-with-topic 1 0
Consume these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624655927380 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:46 UTC 2021'} CreateTime:1624655930318 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:49 UTC 2021'} CreateTime:1624656092851 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:21:31 UTC 2021'} CreateTime:1624656142408 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:21 UTC 2021'} CreateTime:1624656149136 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:27 UTC 2021'} CreateTime:1624656159306 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:38 UTC 2021'} CreateTime:1624656197636 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:16 UTC 2021'} CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 9 messages
Change offset to the latest one.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-console-client --topic keep-up-with-topic --reset-offsets --to-latest --execute
GROUP TOPIC PARTITION NEW-OFFSET my-console-client keep-up-with-topic 3 9 my-console-client keep-up-with-topic 2 0 my-console-client keep-up-with-topic 0 0 my-console-client keep-up-with-topic 4 0 my-console-client keep-up-with-topic 1 0
There are no new messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
^CProcessed a total of 0 messages
Change offset to the specific one.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-console-client --topic keep-up-with-topic --reset-offsets --to-offset 7 --execute
[2021-06-25 21:37:31,190] WARN New offset (7) is higher than latest offset for topic partition keep-up-with-topic-2. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$) [2021-06-25 21:37:31,191] WARN New offset (7) is higher than latest offset for topic partition keep-up-with-topic-0. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$) [2021-06-25 21:37:31,191] WARN New offset (7) is higher than latest offset for topic partition keep-up-with-topic-4. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$) [2021-06-25 21:37:31,191] WARN New offset (7) is higher than latest offset for topic partition keep-up-with-topic-1. Value will be set to 0 (kafka.admin.ConsumerGroupCommand$) GROUP TOPIC PARTITION NEW-OFFSET my-console-client keep-up-with-topic 3 7 my-console-client keep-up-with-topic 2 0 my-console-client keep-up-with-topic 0 0 my-console-client keep-up-with-topic 4 0 my-console-client keep-up-with-topic 1 0
Consume these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 2 messages
Change offset to the specific datetime (1624656159306
).
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-console-client --topic keep-up-with-topic --reset-offsets --to-datetime 2021-06-25T21:22:39.306 --execute
GROUP TOPIC PARTITION NEW-OFFSET my-console-client keep-up-with-topic 3 5 my-console-client keep-up-with-topic 2 0 my-console-client keep-up-with-topic 0 0 my-console-client keep-up-with-topic 4 0 my-console-client keep-up-with-topic 1 0
Consume these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624656159306 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:38 UTC 2021'} CreateTime:1624656197636 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:16 UTC 2021'} CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 4 messages
Additional notes
You can also use kafka-streams-application-reset.sh
to change current offset.
$ bin/kafka-streams-application-reset.sh --bootstrap-servers localhost:9092 --input-topics keep-up-with-topic --application-id my-console-client --to-earliest
Reset-offsets for input topics [keep-up-with-topic] Following input topics offsets will be reset to (for consumer group my-console-client) Topic: keep-up-with-topic Partition: 4 Offset: 0 Topic: keep-up-with-topic Partition: 2 Offset: 0 Topic: keep-up-with-topic Partition: 3 Offset: 0 Topic: keep-up-with-topic Partition: 0 Offset: 0 Topic: keep-up-with-topic Partition: 1 Offset: 0 Done. Deleting all internal/auto-created topics for application my-console-client Done.
I won’t repeat each detail here, you can use it to perform every operation described earlier.