Skip to content

Instantly share code, notes, and snippets.

@nunosilva800
Created January 14, 2022 15:35
Plan Kafka cluster expansion by increased replication factor. Assumes new broker ids are shifted by 5.
require 'optparse'
require 'json'
options = {}
op = OptionParser.new do |opts|
opts.banner = "Usage: example.rb [options]"
opts.on("-c", "--context REQUIRED", String, "k8s context") do |v|
options[:context] = v
end
opts.on("-n", "--namespace REQUIRED", String, "k8s namepace") do |v|
options[:namespace] = v
end
opts.on("-t", "--topics REQUIRED", Array, "kafka topics") do |v|
options[:topics] = v
end
opts.on("-o", "--output REQUIRED", String, "output file") do |v|
options[:output] = v
end
end
op.parse!
if options[:context].nil? || options[:namespace].nil? || options[:topics].nil?
puts op.help
exit 1
end
file = {
version: 1,
partitions: []
}
# Parse this output to extract partition number and replicas:
#
# "Picked up JAVA_TOOL_OPTIONS: -XX:+UseContainerSupport
# Topic:orders\tPartitionCount:15\tReplicationFactor:3\tConfigs:
# \tTopic: orders\tPartition: 0\tLeader: 9\tReplicas: 9,6,8\tIsr: 6,8,9
# \tTopic: orders\tPartition: 1\tLeader: 6\tReplicas: 6,8,5\tIsr: 5,6,8
# \tTopic: orders\tPartition: 2\tLeader: 8\tReplicas: 8,5,7\tIsr: 7,5,8
# \tTopic: orders\tPartition: 3\tLeader: 5\tReplicas: 5,7,9\tIsr: 7,5,9
# \tTopic: orders\tPartition: 4\tLeader: 7\tReplicas: 7,9,6\tIsr: 9,6,7
# \tTopic: orders\tPartition: 5\tLeader: 9\tReplicas: 9,8,5\tIsr: 5,9,8
# \tTopic: orders\tPartition: 6\tLeader: 6\tReplicas: 6,5,7\tIsr: 7,5,6
# \tTopic: orders\tPartition: 7\tLeader: 8\tReplicas: 8,7,9\tIsr: 7,8,9
# \tTopic: orders\tPartition: 8\tLeader: 5\tReplicas: 5,9,6\tIsr: 5,6,9
# \tTopic: orders\tPartition: 9\tLeader: 7\tReplicas: 7,6,8\tIsr: 8,7,6
# \tTopic: orders\tPartition: 10\tLeader: 9\tReplicas: 9,5,7\tIsr: 7,5,9
# \tTopic: orders\tPartition: 11\tLeader: 6\tReplicas: 6,7,9\tIsr: 7,6,9
# \tTopic: orders\tPartition: 12\tLeader: 8\tReplicas: 8,9,6\tIsr: 6,8,9
# \tTopic: orders\tPartition: 13\tLeader: 5\tReplicas: 5,6,8\tIsr: 5,8,6
# \tTopic: orders\tPartition: 14\tLeader: 7\tReplicas: 7,8,5\tIsr: 8,5,7
#
options[:topics].each do |topic|
cmd_describe_topic = "kubectl --context #{options[:context]} --namespace #{options[:namespace]} " +
"exec -ti kafka-0 -c broker -- /opt/kafka/bin/kafka-topics.sh --zookeeper=zetcd:2181/kafka " +
"--describe --topic #{topic}"
res = %x[#{cmd_describe_topic}]
res.split("\n").each do |line|
md_partition = /Partition: \d*/.match(line.strip)
md_replicas = /Replicas: [\d*,]{5}/.match(line.strip)
if md_replicas
current_replicas = md_replicas.to_s.split(' ').last.split(',').map(&:to_i)
new_replicas = current_replicas.map { |p| (p + 5) }
file[:partitions] << {
topic: topic,
partition: md_partition.to_s.split(' ').last.to_i,
replicas: current_replicas + new_replicas,
}
end
end
end
# Write a nice JSON to feed into kafka-reassign-partitions.sh --reassignment-json-file
#
# {
# "version": 1,
# "partitions": [
# {
# "topic": "__consumer_offsets",
# "partition": 0,
# "replicas": [1,3,4,6,8,9]
# },
# ...
# ]
# }
File.open(options[:output], 'w') do |f|
f << JSON.pretty_generate(file)
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment