Categories
DevOps

How to perform basic console operations on Kafka cluster

Perform basic console operations on Kafka cluster.

Topics

Create first-topic with replication factor 1, single partition, and short retention time.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first-topic --replication-factor 1 --partitions 1 -config retention.ms=180000
Created topic first-topic.

Create second-topic if it does not exist with replication factor 2 and two partitions.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --if-not-exists --topic second-topic --replication-factor 2 --partitions 2
Created topic second-topic.

Create third-topic with replication factor 3 and six partitions.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic third-topic --replication-factor 3 --partitions 6
Created topic third-topic.

List topics.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
first-topic
second-topic
third-topic

Describe all topics.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic: first-topic      TopicId: ifRwctrESCq5iZ1Trg69rA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824,retention.ms=180000
        Topic: first-topic      Partition: 0    Leader: 3       Replicas: 3     Isr: 3
Topic: second-topic     TopicId: lPQbW6WpRsuw7Gu-gSSzHQ PartitionCount: 2       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: second-topic     Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: second-topic     Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
Topic: third-topic      TopicId: dWWsNCh1SLuJb684jtpvtw PartitionCount: 6       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: third-topic      Partition: 0    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
        Topic: third-topic      Partition: 1    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1
        Topic: third-topic      Partition: 2    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2
        Topic: third-topic      Partition: 3    Leader: 1       Replicas: 1,3,2 Isr: 1,3,2
        Topic: third-topic      Partition: 4    Leader: 2       Replicas: 2,1,3 Isr: 2,1,3
        Topic: third-topic      Partition: 5    Leader: 3       Replicas: 3,2,1 Isr: 3,2,1

Update retention time on the first-topic.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic first-topic --add-config retention.ms=300000
Completed updating config for topic first-topic.

Display configuration for the first-topic.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --topic first-topic
Dynamic configs for topic first-topic are:
  retention.ms=300000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=300000}

Display detailed configuration for the first-topic.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --all --topic first-topic 
All configs for topic first-topic are:
  compression.type=producer sensitive=false synonyms={DEFAULT_CONFIG:compression.type=producer}
  leader.replication.throttled.replicas= sensitive=false synonyms={}
  message.downconversion.enable=true sensitive=false synonyms={DEFAULT_CONFIG:log.message.downconversion.enable=true}
  min.insync.replicas=1 sensitive=false synonyms={DEFAULT_CONFIG:min.insync.replicas=1}
  segment.jitter.ms=0 sensitive=false synonyms={}
  cleanup.policy=delete sensitive=false synonyms={DEFAULT_CONFIG:log.cleanup.policy=delete}
  flush.ms=9223372036854775807 sensitive=false synonyms={}
  follower.replication.throttled.replicas= sensitive=false synonyms={}
  segment.bytes=1073741824 sensitive=false synonyms={STATIC_BROKER_CONFIG:log.segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=300000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=300000}
  flush.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
  message.format.version=2.8-IV1 sensitive=false synonyms={DEFAULT_CONFIG:log.message.format.version=2.8-IV1}
  max.compaction.lag.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.max.compaction.lag.ms=9223372036854775807}
  file.delete.delay.ms=60000 sensitive=false synonyms={DEFAULT_CONFIG:log.segment.delete.delay.ms=60000}
  max.message.bytes=1048588 sensitive=false synonyms={DEFAULT_CONFIG:message.max.bytes=1048588}
  min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
  message.timestamp.type=CreateTime sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.type=CreateTime}
  preallocate=false sensitive=false synonyms={DEFAULT_CONFIG:log.preallocate=false}
  min.cleanable.dirty.ratio=0.5 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.cleanable.ratio=0.5}
  index.interval.bytes=4096 sensitive=false synonyms={DEFAULT_CONFIG:log.index.interval.bytes=4096}
  unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
  retention.bytes=-1 sensitive=false synonyms={DEFAULT_CONFIG:log.retention.bytes=-1}
  delete.retention.ms=86400000 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.delete.retention.ms=86400000}
  segment.ms=604800000 sensitive=false synonyms={}
  message.timestamp.difference.max.ms=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.message.timestamp.difference.max.ms=9223372036854775807}
  segment.index.bytes=10485760 sensitive=false synonyms={DEFAULT_CONFIG:log.index.size.max.bytes=10485760}

Delete configuration option.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --topic first-topic --delete-config retention.ms
Completed updating config for topic first-topic.

Increase partition number on the second-topic.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic second-topic  --partitions 3

Describe the second-topic.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second-topic
Topic: second-topic     TopicId: lPQbW6WpRsuw7Gu-gSSzHQ PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: second-topic     Partition: 0    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: second-topic     Partition: 1    Leader: 2       Replicas: 2,3   Isr: 2,3
        Topic: second-topic     Partition: 2    Leader: 3       Replicas: 3,1   Isr: 3,1

Delete third-topic topic.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic third-topic

Messages

Produce a message on the first-topic.

$ date | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first-topic

Capture new messages on the first-topic using console-consumer-52888 group.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --group console-consumer-52888
Fri Jun 25 12:40:08 UTC 2021
[...]

Capture all available messages on the first-topic.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first-topic --from-beginning
Fri Jun 25 12:39:29 UTC 2021
Fri Jun 25 12:40:08 UTC 2021
[...]

Produce key-value pairs.

$ echo  "hello_msg: {'who':'$(whoami)','where':'$(hostname)','when':'$(date)'}" | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic third-topic --property parse.key=true --property key.separator=:

Consume key-value pairs.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic third-topic --group console-consumer-52888 --property print.key=true  --property print.timestamp=true 
CreateTime:1624631123237        hello_msg        {'who':'kafka','where':'kafka3.example.org','when':'Fri Jun 25 14:25:21 UTC 2021'}
[...]

Consumers

List consumer groups.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
console-consumer-52888
console-consumer-26513
console-consumer-37173
console-consumer-89198
console-consumer-34365

Describe all consumer groups.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --all-groups
Consumer group 'console-consumer-26513' has no active members.

Consumer group 'console-consumer-34365' has no active members.

Consumer group 'console-consumer-37173' has no active members.

GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-52888 first-topic     0          -               6               -               consumer-console-consumer-52888-1-67ccaca5-33fe-48b1-987a-fbd09ba6eada /172.16.0.101   consumer-console-consumer-52888-1

Consumer group 'console-consumer-89198' has no active members.

List consumers in a particular consumer group.

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group console-consumer-52888
GROUP                  TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                            HOST            CLIENT-ID
console-consumer-52888 first-topic     0          13              13              0               consumer-console-consumer-52888-1-854cf0c8-6314-44a4-af88-1c9bf607b690 /172.16.0.101   consumer-console-consumer-52888-1