Determine and restore preferred leader for specific Kafka topic.
Create a topic
Create a sample topic.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic hydra --replication-factor 3 --partitions 10
List topics.
$ bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --list --exclude-internal
hydra
Create a check_preferred_leader.sh
shell script that takes topic as a parameter and determines if preferred leader is used.
#!/bin/bash # check_preferred_leader.sh # Article at https://sleeplessbeastie.eu/2022/02/23/how-to-determine-and-restore-preferred-leader-for-specific-kafka-topic/ # # param: # topic kafka_topics_script="/opt/kafka/kafka/bin/kafka-topics.sh" bootstrap_server="localhost:9092" if [ "$#" -eq "1" ]; then topic_details="$($kafka_topics_script --bootstrap-server $bootstrap_server --describe --topic $1 2>/dev/null)" if [ "$?" -eq "0" ]; then echo "Topic: $1" echo -e "$topic_details" | \ awk -F"\t" '/^\tTopic/ { topic=$2; sub(/Topic: /, "", topic); partition=$3; sub(/Partition: /, "", partition); leader=$4; sub(/Leader: /, "", leader); replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ","); if(leader == replicas_array[1]) { preferred_replica_msg="" } else { preferred_replica_msg=" (the leader is not the first replica) " } print "Partition " partition " with leader " leader " and replicas " replicas preferred_replica_msg }' else exit 1 fi fi
Create a create_leader_election_json.sh
shell script that takes topic as a parameter and creates JSON file for preferred leader election.
#!/bin/bash # create_leader_election_json.sh # Article at https://sleeplessbeastie.eu/2022/02/23/how-to-determine-and-restore-preferred-leader-for-specific-kafka-topic/ # # param: # topic kafka_topics_script="/opt/kafka/kafka/bin/kafka-topics.sh" bootstrap_server="localhost:9092" if [ "$#" -eq "1" ]; then topic_details="$($kafka_topics_script --bootstrap-server $bootstrap_server --describe --topic $1 2>/dev/null)" if [ "$?" -eq "0" ]; then output_lines="$(echo -e "$topic_details" | \ awk -F"\t" \ 'BEGIN{lines=0} /^\tTopic/ { leader=$4; sub(/Leader: /, "", leader); replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ","); if(leader != replicas_array[1]) { lines++ } } END{print lines}' )" if [ "$output_lines" -gt "0" ]; then echo '{"partitions": [' echo -e "$topic_details" | \ awk -F"\t" \ -v lines=$output_lines \ 'BEGIN{current_line=0} /^\tTopic/ { topic=$2; sub(/Topic: /, "", topic); partition=$3; sub(/Partition: /, "", partition); leader=$4; sub(/Leader: /, "", leader); replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ","); if(leader != replicas_array[1]) { current_line++ if(current_line < lines) { comma_msg="," } else { comma_msg="" } print "{\"topic\": \"" topic "\", \"partition\": " partition "}" comma_msg } }' echo ']}' fi else exit 1 fi fi
Determine preferred leader
List topic details to determine preffered leader. The rule is that leader should be the same as the first replica.
$ bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --describe --topic hydra
Topic: hydra PartitionCount: 10 ReplicationFactor: 3 Configs: segment.bytes=1073741824,message.format.version=2.0-IV1 Topic: hydra Partition: 0 Leader: 2 Replicas: 2,4,3 Isr: 2,4,3 Topic: hydra Partition: 1 Leader: 3 Replicas: 3,2,5 Isr: 3,2,5 Topic: hydra Partition: 2 Leader: 5 Replicas: 5,3,1 Isr: 5,3,1 Topic: hydra Partition: 3 Leader: 1 Replicas: 1,5,4 Isr: 1,5,4 Topic: hydra Partition: 4 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2 Topic: hydra Partition: 5 Leader: 2 Replicas: 2,3,5 Isr: 2,3,5 Topic: hydra Partition: 6 Leader: 3 Replicas: 3,5,1 Isr: 3,5,1 Topic: hydra Partition: 7 Leader: 5 Replicas: 5,1,4 Isr: 5,1,4 Topic: hydra Partition: 8 Leader: 1 Replicas: 1,4,2 Isr: 1,4,2 Topic: hydra Partition: 9 Leader: 4 Replicas: 4,2,3 Isr: 4,2,3
Break things
Restart broker with id 5
to see that leader will change.
$ ./bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --describe --topic hydra
Topic: hydra PartitionCount: 10 ReplicationFactor: 3 Configs: segment.bytes=1073741824,message.format.version=2.0-IV1 Topic: hydra Partition: 0 Leader: 2 Replicas: 2,4,3 Isr: 4,3,2 Topic: hydra Partition: 1 Leader: 3 Replicas: 3,2,5 Isr: 3,2,5 Topic: hydra Partition: 2 Leader: 3 Replicas: 5,3,1 Isr: 3,1,5 Topic: hydra Partition: 3 Leader: 1 Replicas: 1,5,4 Isr: 1,4,5 Topic: hydra Partition: 4 Leader: 4 Replicas: 4,1,2 Isr: 1,2,4 Topic: hydra Partition: 5 Leader: 2 Replicas: 2,3,5 Isr: 3,2,5 Topic: hydra Partition: 6 Leader: 3 Replicas: 3,5,1 Isr: 3,1,5 Topic: hydra Partition: 7 Leader: 1 Replicas: 5,1,4 Isr: 1,4,5 Topic: hydra Partition: 8 Leader: 1 Replicas: 1,4,2 Isr: 1,2,4 Topic: hydra Partition: 9 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4
Use created shell script to programmatically determine if the preferred leader is set.
$ bash check_preferred_leader.sh hydra
Topic: hydra Partition 0 with leader 2 and replicas 2,4,3 Partition 1 with leader 3 and replicas 3,2,5 Partition 2 with leader 3 and replicas 5,3,1 (the leader is not the first replica) Partition 3 with leader 1 and replicas 1,5,4 Partition 4 with leader 4 and replicas 4,1,2 Partition 5 with leader 2 and replicas 2,3,5 Partition 6 with leader 3 and replicas 3,5,1 Partition 7 with leader 1 and replicas 5,1,4 (the leader is not the first replica) Partition 8 with leader 1 and replicas 1,4,2 Partition 9 with leader 4 and replicas 4,2,3
Restore preferred leader for every topic
Initiate leader election for every topic.
$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --all-topic-partitions
Restore preferred leader for specific topic partitions
Initiate leader election for specific topic partitions.
$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --topic hydra --partiton 5
Successfully completed leader election (PREFERRED) for partitions hydra-5
$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --topic hydra --partition 7
Successfully completed leader election (PREFERRED) for partitions hydra-7
Restore preferred leader for specific topic
Generate JSON file that will contain partitions that require leader election.
$ bash create_leader_election_json.sh hydra | tee /tmp/hydra.json
{"partitions": [ {"topic": "hydra", "partition": 2}, {"topic": "hydra", "partition": 7} ]}
Execute leader election.
$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --path-to-json-file /tmp/hydra.json
Successfully completed leader election (PREFERRED) for partitions hydra-2, hydra-7
Additional notes
This process will be performed automatically every five minutes. Inspect default parameters.
$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe --all | grep "leader\|All"
All configs for broker 1 are: auto.leader.rebalance.enable=true sensitive=false synonyms={DEFAULT_CONFIG:auto.leader.rebalance.enable=true} leader.imbalance.check.interval.seconds=300 sensitive=false synonyms={DEFAULT_CONFIG:leader.imbalance.check.interval.seconds=300} unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false} leader.imbalance.per.broker.percentage=10 sensitive=false synonyms={DEFAULT_CONFIG:leader.imbalance.per.broker.percentage=10}