Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafkat.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Gem::Specification.new do |s|
s.add_development_dependency "activesupport", ">= 2", "< 5"
s.add_development_dependency 'rake'
s.add_development_dependency 'simplecov', '~> 0.11.0'
s.add_development_dependency 'rspec', '~> 3.2.0'
s.add_development_dependency 'rspec', '~> 3.5.0'
s.add_development_dependency 'rspec-collection_matchers', '~> 1.1.0'
s.add_development_dependency 'factory_girl', '~> 4.5.0'
end
66 changes: 31 additions & 35 deletions lib/kafkat/command/set-replication-factor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ def run

assignments = []
topics.each do |_, t|
current_rf = t.partitions[0].replicas.size
if new_rf < current_rf
warn_reduce_brokers if opts[:brokers]
assignments += reduce_rf(t, current_rf, new_rf)
elsif new_rf > current_rf
assignments += increase_rf(t, current_rf, new_rf, broker_ids)
t.partitions.each do |p|
if p.replicas.size < new_rf
assignments << increase_rf(t, p, new_rf, broker_ids)
elsif p.replicas.size > new_rf
assignments << reduce_rf(t, p, new_rf)
end
end
end

Expand Down Expand Up @@ -96,7 +96,8 @@ def run
# For every partition, remove the last replica from the replica list.
# If the last replica is the leader, then the previous replica is removed instead.
#
def reduce_rf(topic, current_rf, new_rf)
def reduce_rf(topic, partition, new_rf)
current_rf = partition.replicas.size
delta_rf = current_rf - new_rf
if current_rf == 1
raise 'Current replication factor if 1. Cannot reduce further.'
Expand All @@ -105,24 +106,21 @@ def reduce_rf(topic, current_rf, new_rf)
raise "New replication factor (#{new_rf}) must be smaller than current replication factor (#{current_rf})"
end
assignments = []
topic.partitions.map do |p|
new_replicas = p.replicas

(0...delta_rf).each do |_|
(0...new_replicas.size).each do |i|
if new_replicas[new_replicas.size-1-i] != p.leader
new_replicas.delete_at(new_replicas.size-1-i)
break
end
new_replicas = partition.replicas

(0...delta_rf).each do |_|
(0...new_replicas.size).each do |i|
if new_replicas[new_replicas.size-1-i] != partition.leader
new_replicas.delete_at(new_replicas.size-1-i)
break
end
end
end

if new_replicas.size != new_rf
raise 'Unexpected state'
end
assignments << Assignment.new(topic.name, p.id, new_replicas)
if new_replicas.size != new_rf
raise 'Unexpected state'
end
assignments
Assignment.new(topic.name, partition.id, new_replicas)
end


Expand All @@ -132,33 +130,31 @@ def reduce_rf(topic, current_rf, new_rf)
#
# The count of new replicas assigned to the brokers is maintained in order to uniformly assign new replicas.
#
def increase_rf(topic, current_rf, new_rf, brokers)
def increase_rf(topic, partition, new_rf, brokers)
current_rf = partition.replicas.size
unless new_rf > current_rf
raise 'New replication factor must be greater than the current replication factor'
end

delta_rf = new_rf - current_rf
if delta_rf > brokers.size
raise "#{delta_rf} new replicas requested for topic #{p.topic_name} but only #{brokers.size} brokers available"
raise "#{delta_rf} new replicas requested for topic #{partition.topic_name} but only #{brokers.size} brokers available"
end

broker_counts = brokers.map { |b| {:id => b, :count => 0} }

assignments = []
topic.partitions.map do |p|
existing_replicas = p.replicas
pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) }
if delta_rf > pick_from.size
raise "Cannot create #{delta_rf} new replicas for partition #{p.topic_name}.#{p.id}, not enough brokers"
end
new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf]
new_replicas.each { |b| b[:count] += 1 }
existing_replicas = partition.replicas
pick_from = broker_counts.reject { |b| existing_replicas.include?(b[:id]) }
if delta_rf > pick_from.size
raise "Cannot create #{delta_rf} new replicas for partition #{partition.topic_name}.#{partition.id}, not enough brokers"
end
new_replicas = pick_from.sort { |a, b| a[:count] <=> b[:count] }[0...delta_rf]
new_replicas.each { |b| b[:count] += 1 }

final_replicas = existing_replicas + new_replicas.map { |b| b[:id] }
final_replicas = existing_replicas + new_replicas.map { |b| b[:id] }

assignments << Assignment.new(topic.name, p.id, final_replicas)
end
assignments
Assignment.new(topic.name, partition.id, final_replicas)
end

def warn_reduce_brokers
Expand Down