Determine and restore preferred leader for specific Kafka topic.

Create a topic

Create a sample topic.

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic hydra --replication-factor 3 --partitions 10

List topics.

$ bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --list --exclude-internal
hydra

Create a check_preferred_leader.sh shell script that takes topic as a parameter and determines if preferred leader is used.

#!/bin/bash
# check_preferred_leader.sh
# Article at https://sleeplessbeastie.eu/2022/02/23/how-to-determine-and-restore-preferred-leader-for-specific-kafka-topic/
#
# param:
#   topic 

kafka_topics_script="/opt/kafka/kafka/bin/kafka-topics.sh"
bootstrap_server="localhost:9092"

if [ "$#" -eq "1" ]; then
  topic_details="$($kafka_topics_script --bootstrap-server $bootstrap_server --describe --topic $1 2>/dev/null)"
  if [ "$?" -eq "0" ]; then
    echo "Topic: $1"
    echo -e "$topic_details" | \
      awk -F"\t" '/^\tTopic/ { 
                               topic=$2; sub(/Topic: /, "", topic);
                               partition=$3; sub(/Partition: /, "", partition);
                               leader=$4; sub(/Leader: /, "", leader);
                               replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ",");
                               if(leader == replicas_array[1]) {
                                 preferred_replica_msg=""
                               } else {
                                 preferred_replica_msg=" (the leader is not the first replica) "
                               }
                               print "Partition " partition " with leader " leader " and replicas " replicas preferred_replica_msg
                             }'
  else
    exit 1
  fi
fi

Create a create_leader_election_json.sh shell script that takes topic as a parameter and creates JSON file for preferred leader election.

#!/bin/bash
# create_leader_election_json.sh
# Article at https://sleeplessbeastie.eu/2022/02/23/how-to-determine-and-restore-preferred-leader-for-specific-kafka-topic/
#
# param:
#   topic 

kafka_topics_script="/opt/kafka/kafka/bin/kafka-topics.sh"
bootstrap_server="localhost:9092"

if [ "$#" -eq "1" ]; then
  topic_details="$($kafka_topics_script --bootstrap-server $bootstrap_server --describe --topic $1 2>/dev/null)"
  if [ "$?" -eq "0" ]; then
    output_lines="$(echo -e "$topic_details" | \
                      awk -F"\t" \
                                 'BEGIN{lines=0}
                                 /^\tTopic/ { 
                                   leader=$4; sub(/Leader: /, "", leader);
                                   replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ",");
                                   if(leader != replicas_array[1]) {
                                     lines++
                                   }
                                 } 
                                 END{print lines}'
    )"

    if [ "$output_lines" -gt "0" ]; then
      echo '{"partitions": ['
      echo -e "$topic_details" | \
        awk -F"\t" \
            -v lines=$output_lines \
                                   'BEGIN{current_line=0} 
                                   /^\tTopic/ { 
                                     topic=$2; sub(/Topic: /, "", topic);
                                     partition=$3; sub(/Partition: /, "", partition);
                                     leader=$4; sub(/Leader: /, "", leader);
                                     replicas=$5; sub(/Replicas: /, "", replicas); split(replicas, replicas_array, ",");
                                     if(leader != replicas_array[1]) {
                                       current_line++
                                       if(current_line < lines) { 
                                         comma_msg=","
                                       } else {
                                         comma_msg=""
                                       }
                                       print "{\"topic\": \"" topic "\", \"partition\": " partition "}" comma_msg
                                     }
                                   }'
      echo ']}'
    fi
  else
    exit 1
  fi
fi

Determine preferred leader

List topic details to determine preffered leader. The rule is that leader should be the same as the first replica.

$ bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --describe --topic hydra
Topic: hydra    PartitionCount: 10      ReplicationFactor: 3    Configs: segment.bytes=1073741824,message.format.version=2.0-IV1
        Topic: hydra    Partition: 0    Leader: 2       Replicas: 2,4,3 Isr: 2,4,3
        Topic: hydra    Partition: 1    Leader: 3       Replicas: 3,2,5 Isr: 3,2,5
        Topic: hydra    Partition: 2    Leader: 5       Replicas: 5,3,1 Isr: 5,3,1
        Topic: hydra    Partition: 3    Leader: 1       Replicas: 1,5,4 Isr: 1,5,4
        Topic: hydra    Partition: 4    Leader: 4       Replicas: 4,1,2 Isr: 4,1,2
        Topic: hydra    Partition: 5    Leader: 2       Replicas: 2,3,5 Isr: 2,3,5
        Topic: hydra    Partition: 6    Leader: 3       Replicas: 3,5,1 Isr: 3,5,1
        Topic: hydra    Partition: 7    Leader: 5       Replicas: 5,1,4 Isr: 5,1,4
        Topic: hydra    Partition: 8    Leader: 1       Replicas: 1,4,2 Isr: 1,4,2
        Topic: hydra    Partition: 9    Leader: 4       Replicas: 4,2,3 Isr: 4,2,3

Break things

Restart broker with id 5 to see that leader will change.

$ ./bin/kafka-topics.sh --bootstrap-server kafka1.example.org:9092 --describe --topic hydra
Topic: hydra    PartitionCount: 10      ReplicationFactor: 3    Configs: segment.bytes=1073741824,message.format.version=2.0-IV1
        Topic: hydra    Partition: 0    Leader: 2       Replicas: 2,4,3 Isr: 4,3,2
        Topic: hydra    Partition: 1    Leader: 3       Replicas: 3,2,5 Isr: 3,2,5
        Topic: hydra    Partition: 2    Leader: 3       Replicas: 5,3,1 Isr: 3,1,5
        Topic: hydra    Partition: 3    Leader: 1       Replicas: 1,5,4 Isr: 1,4,5
        Topic: hydra    Partition: 4    Leader: 4       Replicas: 4,1,2 Isr: 1,2,4
        Topic: hydra    Partition: 5    Leader: 2       Replicas: 2,3,5 Isr: 3,2,5
        Topic: hydra    Partition: 6    Leader: 3       Replicas: 3,5,1 Isr: 3,1,5
        Topic: hydra    Partition: 7    Leader: 1       Replicas: 5,1,4 Isr: 1,4,5
        Topic: hydra    Partition: 8    Leader: 1       Replicas: 1,4,2 Isr: 1,2,4
        Topic: hydra    Partition: 9    Leader: 4       Replicas: 4,2,3 Isr: 2,3,4

Use created shell script to programmatically determine if the preferred leader is set.

$ bash check_preferred_leader.sh hydra
Topic: hydra
Partition 0 with leader 2 and replicas 2,4,3
Partition 1 with leader 3 and replicas 3,2,5
Partition 2 with leader 3 and replicas 5,3,1 (the leader is not the first replica) 
Partition 3 with leader 1 and replicas 1,5,4
Partition 4 with leader 4 and replicas 4,1,2
Partition 5 with leader 2 and replicas 2,3,5
Partition 6 with leader 3 and replicas 3,5,1
Partition 7 with leader 1 and replicas 5,1,4 (the leader is not the first replica) 
Partition 8 with leader 1 and replicas 1,4,2
Partition 9 with leader 4 and replicas 4,2,3

Restore preferred leader for every topic

Initiate leader election for every topic.

$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred  --all-topic-partitions

Restore preferred leader for specific topic partitions

Initiate leader election for specific topic partitions.

$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --topic hydra --partiton 5
Successfully completed leader election (PREFERRED) for partitions hydra-5
$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred --topic hydra --partition 7
Successfully completed leader election (PREFERRED) for partitions hydra-7

Restore preferred leader for specific topic

Generate JSON file that will contain partitions that require leader election.

$ bash create_leader_election_json.sh hydra | tee /tmp/hydra.json
{"partitions": [
{"topic": "hydra", "partition": 2},
{"topic": "hydra", "partition": 7}
]}

Execute leader election.

$ ./bin/kafka-leader-election.sh --bootstrap-server kafka1.example.org:9092 --election-type preferred  --path-to-json-file /tmp/hydra.json 
Successfully completed leader election (PREFERRED) for partitions hydra-2, hydra-7

Additional notes

This process will be performed automatically every five minutes. Inspect default parameters.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 1 --describe --all | grep "leader\|All"
All configs for broker 1 are:
  auto.leader.rebalance.enable=true sensitive=false synonyms={DEFAULT_CONFIG:auto.leader.rebalance.enable=true}
  leader.imbalance.check.interval.seconds=300 sensitive=false synonyms={DEFAULT_CONFIG:leader.imbalance.check.interval.seconds=300}
  unclean.leader.election.enable=false sensitive=false synonyms={DEFAULT_CONFIG:unclean.leader.election.enable=false}
  leader.imbalance.per.broker.percentage=10 sensitive=false synonyms={DEFAULT_CONFIG:leader.imbalance.per.broker.percentage=10}