Categories
DevOps

How to delete Kafka messages

Delete every Kafka message or messages up to the specific offset.

Preparations

Create sample keep-up-with-topic topic.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic keep-up-with-topic --replication-factor 3 --partitions 5
Created topic keep-up-with-topic.

Display topic details.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic keep-up-with-topic
Topic: keep-up-with-topic       TopicId: oBWzsfSAQo-63ym7h2Slmw PartitionCount: 5       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: keep-up-with-topic       Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: keep-up-with-topic       Partition: 1    Leader: 2       Replicas: 2,3,5 Isr: 2,3,5
        Topic: keep-up-with-topic       Partition: 2    Leader: 3       Replicas: 3,5,4 Isr: 3,5,4
        Topic: keep-up-with-topic       Partition: 3    Leader: 5       Replicas: 5,4,1 Isr: 5,4,1
        Topic: keep-up-with-topic       Partition: 4    Leader: 4       Replicas: 4,1,2 Isr: 4,1,2

Publish several messages.

$ echo  "hello_msg: {'who':'$(whoami)','where':'$(hostname)','when':'$(date)'}" | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --property parse.key=true --property key.separator=:

Consume these messages.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true  --property print.timestamp=true 
CreateTime:1624655927380        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:46 UTC 2021'}
CreateTime:1624655930318        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:18:49 UTC 2021'}
CreateTime:1624656092851        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:21:31 UTC 2021'}
CreateTime:1624656142408        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:21 UTC 2021'}
CreateTime:1624656149136        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:27 UTC 2021'}
CreateTime:1624656159306        hello_msg        {'who':'kafka','where':'kafka5.example.org','when':'Fri Jun 25 21:22:38 UTC 2021'}
CreateTime:1624656197636        hello_msg        {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:16 UTC 2021'}
CreateTime:1624656214769        hello_msg        {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'}
CreateTime:1624656217408        hello_msg        {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'}
^CProcessed a total of 9 messages

Delete messages up to the specific offset

Get the offset for the specific timestamp.

$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic keep-up-with-topic  --time 1624656214765
keep-up-with-topic:0:
keep-up-with-topic:1:
keep-up-with-topic:2:
keep-up-with-topic:3:7
keep-up-with-topic:4:

Create a definition that reflects the above results.

$ cat <<EOF | tee keep-up-with-topic.offset.json
{
 "partitions": [
   {
     "topic": "keep-up-with-topic",
     "partition": 3,
     "offset": 7
   }
 ],
 "version": 1
}
EOF

Use it to delete messages up to the located offset on selected partitions.

$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092  --offset-json-file keep-up-with-topic.offset.json
Executing records delete operation
Records delete operation completed:
partition: keep-up-with-topic-3 low_watermark: 7

Reset the offset as this is an example.

$ bin/kafka-streams-application-reset.sh --bootstrap-servers localhost:9092 --input-topics keep-up-with-topic --application-id my-console-client --to-earliest
Reset-offsets for input topics [keep-up-with-topic]
Following input topics offsets will be reset to (for consumer group my-console-client)
Topic: keep-up-with-topic Partition: 4 Offset: 0
Topic: keep-up-with-topic Partition: 2 Offset: 0
Topic: keep-up-with-topic Partition: 3 Offset: 7
Topic: keep-up-with-topic Partition: 0 Offset: 0
Topic: keep-up-with-topic Partition: 1 Offset: 0
Done.
Deleting all internal/auto-created topics for application my-console-client
Done.

Inspect these messages.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true  --property print.timestamp=true 
CreateTime:1624656214769        hello_msg        {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:33 UTC 2021'}
CreateTime:1624656217408        hello_msg        {'who':'kafka','where':'kafka4.example.org','when':'Fri Jun 25 21:23:36 UTC 2021'}
^CProcessed a total of 2 messages

Delete all messages

Create a definition that reflects the most recent offset on every topic partitions.

$ cat <<EOF | tee keep-up-with-topic.all.json
{
 "partitions": [
   {
     "topic": "keep-up-with-topic",
     "partition": 0,
     "offset": -1
   },
   {
     "topic": "keep-up-with-topic",
     "partition": 1,
     "offset": -1
   },
   {
     "topic": "keep-up-with-topic",
     "partition": 2,
     "offset": -1
   },
   {
     "topic": "keep-up-with-topic",
     "partition": 3,
     "offset": -1
   },
   {
     "topic": "keep-up-with-topic",
     "partition": 4,
     "offset": -1
   }
 ],
 "version": 1
}
EOF

Use it to delete all messages.

$ bin/kafka-delete-records.sh --bootstrap-server localhost:9092  --offset-json-file keep-up-with-topic.all.json
Executing records delete operation
Records delete operation completed:
partition: keep-up-with-topic-2 low_watermark: 0
partition: keep-up-with-topic-3 low_watermark: 9
partition: keep-up-with-topic-0 low_watermark: 0
partition: keep-up-with-topic-1 low_watermark: 0
partition: keep-up-with-topic-4 low_watermark: 0

Reset the offset as this is an example.

$ bin/kafka-streams-application-reset.sh --bootstrap-servers localhost:9092 --input-topics keep-up-with-topic --application-id my-console-client --to-earliest
Reset-offsets for input topics [keep-up-with-topic]
Following input topics offsets will be reset to (for consumer group my-console-client)
Topic: keep-up-with-topic Partition: 4 Offset: 0
Topic: keep-up-with-topic Partition: 2 Offset: 0
Topic: keep-up-with-topic Partition: 3 Offset: 9
Topic: keep-up-with-topic Partition: 0 Offset: 0
Topic: keep-up-with-topic Partition: 1 Offset: 0
Done.
Deleting all internal/auto-created topics for application my-console-client
Done.

There are no new messages.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic keep-up-with-topic --group my-console-client --property print.key=true  --property print.timestamp=true 
^CProcessed a total of 0 messages