Perform basic console operations on Kafka cluster.
Topics
Create first-topic
with replication factor 1, single partition, and short retention time.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic --replication-factor 1 --partitions 1 -config retention.ms=180000
Created topic first-topic.
Create second-topic
if it does not exist with replication factor 2 and two partitions.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --if-not-exists --topic second-topic --replication-factor 2 --partitions 2
Created topic second-topic.
Create third-topic
with replication factor 3 and six partitions.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic third-topic --replication-factor 3 --partitions 6
Created topic third-topic.
List topics.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
first-topic second-topic third-topic
Describe all topics.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: first-topic TopicId: ifRwctrESCq5iZ1Trg69rA PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=180000 Topic: first-topic Partition: 0 Leader: 3 Replicas: 3 Isr: 3 Topic: second-topic TopicId: lPQbW6WpRsuw7Gu-gSSzHQ PartitionCount: 2 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: second-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: second-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: third-topic TopicId: dWWsNCh1SLuJb684jtpvtw PartitionCount: 6 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: third-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: third-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: third-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: third-topic Partition: 3 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: third-topic Partition: 4 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: third-topic Partition: 5 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Update retention time on the first-topic
.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic first-topic --add-config retention.ms=300000
Completed updating config for topic first-topic.
Display configuration for the first-topic
.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --topic first-topic
Dynamic configs for topic first-topic are: retention.ms=300000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=300000}
Display detailed configuration for the first-topic
.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --all --topic first-topic
All configs for topic first-topic are: compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer} leader.replication.throttled.replicas= sensitive=false synonyms={} message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true} min.insync.replicas=1 sensitive=false synonyms={DEFAULT_CONFIG:min.insync.replicas=1} segment.jitter.ms=0 sensitive=false synonyms={} cleanup.policy=delete sensitive=false synonyms={DEFAULT_CONFIG:log.cleanup.policy=delete} flush.ms=9223372036854775807 sensitive=false synonyms={} follower.replication.throttled.replicas= sensitive=false synonyms={} segment.bytes=1073741824 sensitive=false synonyms={STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824} retention.ms=300000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=300000} flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807} message.format.version=2.8-IV1 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=2.8-IV1} max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807} file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000} max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588} min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0} message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime} preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false} min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5} index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096} unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false} retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1} delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000} segment.ms=604800000 sensitive=false synonyms={} message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807} segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}
Delete configuration option.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic first-topic --delete-config retention.ms
Completed updating config for topic first-topic.
Increase partition number on the second-topic
.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic second-topic --partitions 3
Describe the second-topic
.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second-topic
Topic: second-topic TopicId: lPQbW6WpRsuw7Gu-gSSzHQ PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824 Topic: second-topic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: second-topic Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: second-topic Partition: 2 Leader: 3 Replicas: 3,1 Isr: 3,1
Delete third-topic
topic.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic third-topic
Messages
Produce a message on the first-topic
.
$ date | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic
Capture new messages on the first-topic
using console-consumer-52888
group.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group console-consumer-52888
Fri Jun 25 12:40:08 UTC 2021 [...]
Capture all available messages on the first-topic
.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --from-beginning
Fri Jun 25 12:39:29 UTC 2021 Fri Jun 25 12:40:08 UTC 2021 [...]
Produce key-value pairs.
$ echo "hello_msg: {'who':'$(whoami)','where':'$(hostname)','when':'$(date)'}" | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic third-topic --property parse.key=true --property key.separator=:
Consume key-value pairs.
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic third-topic --group console-consumer-52888 --property print.key=true --property print.timestamp=true
CreateTime:1624631123237 hello_msg {'who':'kafka','where':'kafka3.example.org','when':'Fri Jun 25 14:25:21 UTC 2021'} [...]
Consumers
List consumer groups.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-52888 console-consumer-26513 console-consumer-37173 console-consumer-89198 console-consumer-34365
Describe all consumer groups.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
Consumer group 'console-consumer-26513' has no active members. Consumer group 'console-consumer-34365' has no active members. Consumer group 'console-consumer-37173' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-52888 first-topic 0 - 6 - consumer-console-consumer-52888-1-67ccaca5-33fe-48b1-987a-fbd09ba6eada /172.16.0.101 consumer-console-consumer-52888-1 Consumer group 'console-consumer-89198' has no active members.
List consumers in a particular consumer group.
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-52888
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID console-consumer-52888 first-topic 0 13 13 0 consumer-console-consumer-52888-1-854cf0c8-6314-44a4-af88-1c9bf607b690 /172.16.0.101 consumer-console-consumer-52888-1