Delete every Kafka message or messages up to the specific offset.
Preparations
Create sample keep-up-with-topic
topic.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic keep-up-with-topic --replication-factor 3 --partitions 5
Created topic keep-up-with-topic.
Display topic details.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic keep-up-with-topic
Topic: keep-up-with-topic TopicId: oBWzsfSAQo-63ym7h2Slmw PartitionCount: 5 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: keep-up-with-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: keep-up-with-topic Partition: 1 Leader: 2 Replicas: 2,3,5 Isr: 2,3,5 Topic: keep-up-with-topic Partition: 2 Leader: 3 Replicas: 3,5,4 Isr: 3,5,4 Topic: keep-up-with-topic Partition: 3 Leader: 5 Replicas: 5,4,1 Isr: 5,4,1 Topic: keep-up-with-topic Partition: 4 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2
Publish several messages.
$ echo "hello_msg: {'who':'$(whoami)','where':'$(hostname)','when':'$(date)'}" | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --property parse.key=true --property key.separator=:
Consume these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624655927380 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:46 UTC 2021'} CreateTime:1624655930318 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:49 UTC 2021'} CreateTime:1624656092851 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:21:31 UTC 2021'} CreateTime:1624656142408 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:21 UTC 2021'} CreateTime:1624656149136 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:27 UTC 2021'} CreateTime:1624656159306 hello_msg {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:38 UTC 2021'} CreateTime:1624656197636 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:16 UTC 2021'} CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 9 messages
Delete messages up to the specific offset
Get the offset for the specific timestamp.
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic keep-up-with-topic --time 1624656214765
keep-up-with-topic:0: keep-up-with-topic:1: keep-up-with-topic:2: keep-up-with-topic:3:7 keep-up-with-topic:4:
Create a definition that reflects the above results.
$ cat <<EOF | tee keep-up-with-topic.offset.json { "partitions": [ { "topic": "keep-up-with-topic", "partition": 3, "offset": 7 } ], "version": 1 } EOF
Use it to delete messages up to the located offset on selected partitions.
$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file keep-up-with-topic.offset.json
Executing records delete operation Records delete operation completed: partition: keep-up-with-topic-3 low_watermark: 7
Reset the offset as this is an example.
$ bin/kafka-streams-application-reset.sh --bootstrap-servers localhost:9092 --input-topics keep-up-with-topic --application-id my-console-client --to-earliest
Reset-offsets for input topics [keep-up-with-topic] Following input topics offsets will be reset to (for consumer group my-console-client) Topic: keep-up-with-topic Partition: 4 Offset: 0 Topic: keep-up-with-topic Partition: 2 Offset: 0 Topic: keep-up-with-topic Partition: 3 Offset: 7 Topic: keep-up-with-topic Partition: 0 Offset: 0 Topic: keep-up-with-topic Partition: 1 Offset: 0 Done. Deleting all internal/auto-created topics for application my-console-client Done.
Inspect these messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
CreateTime:1624656214769 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'} CreateTime:1624656217408 hello_msg {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'} ^CProcessed a total of 2 messages
Delete all messages
Create a definition that reflects the most recent offset on every topic partitions.
$ cat <<EOF | tee keep-up-with-topic.all.json { "partitions": [ { "topic": "keep-up-with-topic", "partition": 0, "offset": -1 }, { "topic": "keep-up-with-topic", "partition": 1, "offset": -1 }, { "topic": "keep-up-with-topic", "partition": 2, "offset": -1 }, { "topic": "keep-up-with-topic", "partition": 3, "offset": -1 }, { "topic": "keep-up-with-topic", "partition": 4, "offset": -1 } ], "version": 1 } EOF
Use it to delete all messages.
$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file keep-up-with-topic.all.json
Executing records delete operation Records delete operation completed: partition: keep-up-with-topic-2 low_watermark: 0 partition: keep-up-with-topic-3 low_watermark: 9 partition: keep-up-with-topic-0 low_watermark: 0 partition: keep-up-with-topic-1 low_watermark: 0 partition: keep-up-with-topic-4 low_watermark: 0
Reset the offset as this is an example.
$ bin/kafka-streams-application-reset.sh --bootstrap-servers localhost:9092 --input-topics keep-up-with-topic --application-id my-console-client --to-earliest
Reset-offsets for input topics [keep-up-with-topic] Following input topics offsets will be reset to (for consumer group my-console-client) Topic: keep-up-with-topic Partition: 4 Offset: 0 Topic: keep-up-with-topic Partition: 2 Offset: 0 Topic: keep-up-with-topic Partition: 3 Offset: 9 Topic: keep-up-with-topic Partition: 0 Offset: 0 Topic: keep-up-with-topic Partition: 1 Offset: 0 Done. Deleting all internal/auto-created topics for application my-console-client Done.
There are no new messages.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true --property print.timestamp=true
^CProcessed a total of 0 messages