Reassign Kafka topic partitions.
Step 1
Create a sample topic.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic cerberus --replication-factor 3 --partitions 15
Created topic cerberus.
Display topic details, notice that there are five brokers.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic cerberus
Topic: cerberus TopicId: Ngs9WPBnQfGqQUHiDOQc7A PartitionCount: 15 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: cerberus Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: cerberus Partition: 1 Leader: 5 Replicas: 5,2,3 Isr: 5,2,3 Topic: cerberus Partition: 2 Leader: 4 Replicas: 4,3,5 Isr: 4,3,5 Topic: cerberus Partition: 3 Leader: 1 Replicas: 1,5,4 Isr: 1,5,4 Topic: cerberus Partition: 4 Leader: 2 Replicas: 2,4,1 Isr: 2,4,1 Topic: cerberus Partition: 5 Leader: 3 Replicas: 3,2,5 Isr: 3,2,5 Topic: cerberus Partition: 6 Leader: 5 Replicas: 5,3,4 Isr: 5,3,4 Topic: cerberus Partition: 7 Leader: 4 Replicas: 4,5,1 Isr: 4,5,1 Topic: cerberus Partition: 8 Leader: 1 Replicas: 1,4,2 Isr: 1,4,2 Topic: cerberus Partition: 9 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 Topic: cerberus Partition: 10 Leader: 3 Replicas: 3,5,4 Isr: 3,5,4 Topic: cerberus Partition: 11 Leader: 5 Replicas: 5,4,1 Isr: 5,4,1 Topic: cerberus Partition: 12 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2 Topic: cerberus Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: cerberus Partition: 14 Leader: 2 Replicas: 2,3,5 Isr: 2,3,5
Step 2
Create a JSON file with a list of topics that we want to reassign.
$ cat << EOF | tee /tmp/topics.json { "topics": [ {"topic": "cerberus"} ], "version":1 } EOF
Step 3
Generate partition reassignment configuration for four brokers.
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file topics.json --broker-list 1,2,3,4
Current partition replica assignment {"version":1,"partitions":[{"topic":"cerberus","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":1,"replicas":[5,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":2,"replicas":[4,3,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":3,"replicas":[1,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":4,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":5,"replicas":[3,2,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":6,"replicas":[5,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":7,"replicas":[4,5,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":8,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":9,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":10,"replicas":[3,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":11,"replicas":[5,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":12,"replicas":[4,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":13,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":14,"replicas":[2,3,5],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"cerberus","partition":0,"replicas":[3,2,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":1,"replicas":[4,3,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":2,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":3,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":4,"replicas":[3,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":5,"replicas":[4,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":6,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":7,"replicas":[2,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":8,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":9,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":10,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":11,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":12,"replicas":[3,2,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":13,"replicas":[4,3,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":14,"replicas":[1,4,2],"log_dirs":["any","any","any"]}]}
Store these assignments in dedicated JSON files.
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --generate --topics-to-move-json-file topics.json --broker-list 1,2,3,4 | tee >(awk -F: '/Current partition replica assignment/ { getline; print $0 }' | jq > /tmp/topic.current.json) >(awk -F: '/Proposed partition reassignment configuration/ { getline; print $0 }' | jq > /tmp/topic.proposed.json)
Current partition replica assignment {"version":1,"partitions":[{"topic":"cerberus","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":1,"replicas":[5,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":2,"replicas":[4,3,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":3,"replicas":[1,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":4,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":5,"replicas":[3,2,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":6,"replicas":[5,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":7,"replicas":[4,5,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":8,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":9,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":10,"replicas":[3,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":11,"replicas":[5,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":12,"replicas":[4,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":13,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":14,"replicas":[2,3,5],"log_dirs":["any","any","any"]}]} Proposed partition reassignment configuration {"version":1,"partitions":[{"topic":"cerberus","partition":0,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":1,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":2,"replicas":[3,2,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":3,"replicas":[4,3,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":4,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":5,"replicas":[2,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":6,"replicas":[3,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":7,"replicas":[4,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":8,"replicas":[1,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":9,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":10,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":11,"replicas":[4,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":12,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":13,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":14,"replicas":[3,2,4],"log_dirs":["any","any","any"]}]
I will remove the log_dirs
option as it is optional and we do not want to specify log directory locations.
$ jq 'del(.partitions[] | .log_dirs)' /tmp/topic.proposed.json > /tmp/topic.reassign.json
Step 4
Inspect created files.
Topic list.
$ cat /tmp/topics.json
{ "topics": [ {"topic": "cerberus"} ], "version":1 }
Current partition replica assignment.
$ cat /tmp/topic.current.json
{ "version": 1, "partitions": [ { "topic": "cerberus", "partition": 0, "replicas": [ 3, 1, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 1, "replicas": [ 5, 2, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 2, "replicas": [ 4, 3, 5 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 3, "replicas": [ 1, 5, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 4, "replicas": [ 2, 4, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 5, "replicas": [ 3, 2, 5 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 6, "replicas": [ 5, 3, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 7, "replicas": [ 4, 5, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 8, "replicas": [ 1, 4, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 9, "replicas": [ 2, 1, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 10, "replicas": [ 3, 5, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 11, "replicas": [ 5, 4, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 12, "replicas": [ 4, 1, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 13, "replicas": [ 1, 2, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 14, "replicas": [ 2, 3, 5 ], "log_dirs": [ "any", "any", "any" ] } ] }
Proposed partition reassignment configuration.
$ cat /tmp/topic.proposed.json
{ "version": 1, "partitions": [ { "topic": "cerberus", "partition": 0, "replicas": [ 1, 4, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 1, "replicas": [ 2, 1, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 2, "replicas": [ 3, 2, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 3, "replicas": [ 4, 3, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 4, "replicas": [ 1, 2, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 5, "replicas": [ 2, 3, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 6, "replicas": [ 3, 4, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 7, "replicas": [ 4, 1, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 8, "replicas": [ 1, 3, 4 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 9, "replicas": [ 2, 4, 1 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 10, "replicas": [ 3, 1, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 11, "replicas": [ 4, 2, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 12, "replicas": [ 1, 4, 2 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 13, "replicas": [ 2, 1, 3 ], "log_dirs": [ "any", "any", "any" ] }, { "topic": "cerberus", "partition": 14, "replicas": [ 3, 2, 4 ], "log_dirs": [ "any", "any", "any" ] } ] }
The altered file.
$ cat /tmp/topic.reassign.json
{ "version": 1, "partitions": [ { "topic": "cerberus", "partition": 0, "replicas": [ 1, 4, 2 ] }, { "topic": "cerberus", "partition": 1, "replicas": [ 2, 1, 3 ] }, { "topic": "cerberus", "partition": 2, "replicas": [ 3, 2, 4 ] }, { "topic": "cerberus", "partition": 3, "replicas": [ 4, 3, 1 ] }, { "topic": "cerberus", "partition": 4, "replicas": [ 1, 2, 3 ] }, { "topic": "cerberus", "partition": 5, "replicas": [ 2, 3, 4 ] }, { "topic": "cerberus", "partition": 6, "replicas": [ 3, 4, 1 ] }, { "topic": "cerberus", "partition": 7, "replicas": [ 4, 1, 2 ] }, { "topic": "cerberus", "partition": 8, "replicas": [ 1, 3, 4 ] }, { "topic": "cerberus", "partition": 9, "replicas": [ 2, 4, 1 ] }, { "topic": "cerberus", "partition": 10, "replicas": [ 3, 1, 2 ] }, { "topic": "cerberus", "partition": 11, "replicas": [ 4, 2, 3 ] }, { "topic": "cerberus", "partition": 12, "replicas": [ 1, 4, 2 ] }, { "topic": "cerberus", "partition": 13, "replicas": [ 2, 1, 3 ] }, { "topic": "cerberus", "partition": 14, "replicas": [ 3, 2, 4 ] } ] }
You can alter this file further, as I mentioned earlier it will be used for reassignment.
For example, you can remove a specific server using the following command.
$ jq 'del(.partitions[] | .replicas[] | select(. == 9))' /tmp/topic.reassign.json > /tmp/topic.reassign.delete.9.json
Step 5
Inspect the reassignment status.
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file /tmp/topic.reassign.json
Status of partition reassignment: There is no active reassignment of partition cerberus-0, but replica set is 3,1,2 rather than 1,4,2. There is no active reassignment of partition cerberus-1, but replica set is 5,2,3 rather than 2,1,3. There is no active reassignment of partition cerberus-2, but replica set is 4,3,5 rather than 3,2,4. There is no active reassignment of partition cerberus-3, but replica set is 1,5,4 rather than 4,3,1. There is no active reassignment of partition cerberus-4, but replica set is 2,4,1 rather than 1,2,3. There is no active reassignment of partition cerberus-5, but replica set is 3,2,5 rather than 2,3,4. There is no active reassignment of partition cerberus-6, but replica set is 5,3,4 rather than 3,4,1. There is no active reassignment of partition cerberus-7, but replica set is 4,5,1 rather than 4,1,2. There is no active reassignment of partition cerberus-8, but replica set is 1,4,2 rather than 1,3,4. There is no active reassignment of partition cerberus-9, but replica set is 2,1,3 rather than 2,4,1. There is no active reassignment of partition cerberus-10, but replica set is 3,5,4 rather than 3,1,2. There is no active reassignment of partition cerberus-11, but replica set is 5,4,1 rather than 4,2,3. There is no active reassignment of partition cerberus-12, but replica set is 4,1,2 rather than 1,4,2. There is no active reassignment of partition cerberus-13, but replica set is 1,2,3 rather than 2,1,3. There is no active reassignment of partition cerberus-14, but replica set is 2,3,5 rather than 3,2,4. Clearing broker-level throttles on brokers 5,1,2,3,4 Clearing topic-level throttles on topic cerberus
Execute the reassignment process.
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --execute --reassignment-json-file /tmp/topic.reassign.json
Current partition replica assignment {"version":1,"partitions":[{"topic":"cerberus","partition":0,"replicas":[3,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":1,"replicas":[5,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":2,"replicas":[4,3,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":3,"replicas":[1,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":4,"replicas":[2,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":5,"replicas":[3,2,5],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":6,"replicas":[5,3,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":7,"replicas":[4,5,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":8,"replicas":[1,4,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":9,"replicas":[2,1,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":10,"replicas":[3,5,4],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":11,"replicas":[5,4,1],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":12,"replicas":[4,1,2],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":13,"replicas":[1,2,3],"log_dirs":["any","any","any"]},{"topic":"cerberus","partition":14,"replicas":[2,3,5],"log_dirs":["any","any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started partition reassignments for cerberus-0,cerberus-1,cerberus-2,cerberus-3,cerberus-4,cerberus-5,cerberus-6,cerberus-7,cerberus-8,cerberus-9,cerberus-10,cerberus-11,cerberus-12,cerberus-13,cerberus-14
Verify if the reassignment process.
$ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --verify --reassignment-json-file /tmp/topic.reassign.json
Status of partition reassignment: Reassignment of partition cerberus-0 is complete. Reassignment of partition cerberus-1 is complete. Reassignment of partition cerberus-2 is complete. Reassignment of partition cerberus-3 is complete. Reassignment of partition cerberus-4 is complete. Reassignment of partition cerberus-5 is complete. Reassignment of partition cerberus-6 is complete. Reassignment of partition cerberus-7 is complete. Reassignment of partition cerberus-8 is complete. Reassignment of partition cerberus-9 is complete. Reassignment of partition cerberus-10 is complete. Reassignment of partition cerberus-11 is complete. Reassignment of partition cerberus-12 is complete. Reassignment of partition cerberus-13 is complete. Reassignment of partition cerberus-14 is complete. Clearing broker-level throttles on brokers 5,1,2,3,4 Clearing topic-level throttles on topic cerberus
Inspect topic details after the reassignment process.
$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic cerberus
Topic: cerberus TopicId: Ngs9WPBnQfGqQUHiDOQc7A PartitionCount: 15 ReplicationFactor: 3 Configs: segment.bytes=1073741824 Topic: cerberus Partition: 0 Leader: 1 Replicas: 1,4,2 Isr: 1,2,4 Topic: cerberus Partition: 1 Leader: 2 Replicas: 2,1,3 Isr: 2,3,1 Topic: cerberus Partition: 2 Leader: 3 Replicas: 3,2,4 Isr: 4,3,2 Topic: cerberus Partition: 3 Leader: 4 Replicas: 4,3,1 Isr: 1,4,3 Topic: cerberus Partition: 4 Leader: 1 Replicas: 1,2,3 Isr: 2,1,3 Topic: cerberus Partition: 5 Leader: 2 Replicas: 2,3,4 Isr: 3,2,4 Topic: cerberus Partition: 6 Leader: 3 Replicas: 3,4,1 Isr: 3,4,1 Topic: cerberus Partition: 7 Leader: 4 Replicas: 4,1,2 Isr: 4,1,2 Topic: cerberus Partition: 8 Leader: 1 Replicas: 1,3,4 Isr: 1,4,3 Topic: cerberus Partition: 9 Leader: 2 Replicas: 2,4,1 Isr: 2,1,4 Topic: cerberus Partition: 10 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 Topic: cerberus Partition: 11 Leader: 4 Replicas: 4,2,3 Isr: 2,3,4 Topic: cerberus Partition: 12 Leader: 1 Replicas: 1,4,2 Isr: 4,1,2 Topic: cerberus Partition: 13 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3 Topic: cerberus Partition: 14 Leader: 3 Replicas: 3,2,4 Isr: 2,3,4