diff --git a/src/main/java/siftscience/kafka/tools/KafkaAssignmentStrategy.java b/src/main/java/siftscience/kafka/tools/KafkaAssignmentStrategy.java index 1f1aea9..26ba830 100644 --- a/src/main/java/siftscience/kafka/tools/KafkaAssignmentStrategy.java +++ b/src/main/java/siftscience/kafka/tools/KafkaAssignmentStrategy.java @@ -1,7 +1,10 @@ package siftscience.kafka.tools; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -41,25 +44,36 @@ public static Map> getRackAwareAssignment( String topicName, Map> currentAssignment, Map nodeRackAssignment, Set nodes, Set partitions, int replicationFactor, Context context) { - // Initialize nodes with capacities and nothing assigned int maxReplicas = getMaxReplicasPerNode(nodes, partitions, replicationFactor); - SortedMap nodeMap = createNodeMap(nodeRackAssignment, nodes, maxReplicas); + int capacity = maxReplicas; + while (capacity > 0) { + try { + // Initialize nodes with capacities and nothing assigned - // Using the current assignment, reassign as many partitions as each node can accept - fillNodesFromAssignment(currentAssignment, nodeMap); + SortedMap nodeMap = createNodeMap(nodeRackAssignment, nodes, maxReplicas); - // Figure out the replicas that have not been assigned yet - Map orphanedReplicas = getOrphanedReplicas(nodeMap, partitions, - replicationFactor); + // Using the current assignment, reassign as many partitions as each node can accept + fillNodesFromAssignment(currentAssignment, nodeMap, capacity); - // Assign those replicas to nodes that can accept them - assignOrphans(topicName, nodeMap, orphanedReplicas); + // Figure out the replicas that have not been assigned yet + Map orphanedReplicas = getOrphanedReplicas(nodeMap, partitions, + replicationFactor); - // Order nodes for each partition such that leadership is relatively balanced - if (context == null) { - context = new Context(); + // Assign those replicas to nodes that can accept them + assignOrphans(topicName, nodeMap, orphanedReplicas); + + // Order nodes for each partition such that leadership is relatively balanced + if (context == null) { + context = new Context(); + } + return computePreferenceLists(topicName, nodeMap, context); + } catch (IllegalStateException e) { + // retry assignOrphans with a smaller capacity.. + capacity--; + } } - return computePreferenceLists(topicName, nodeMap, context); + // Failed to create a rack-aware assignment, even with capacity=1 per broker. + throw new IllegalStateException("Could not create a rack-aware assignment given the supplied constraints."); } private static int getMaxReplicasPerNode( @@ -99,7 +113,7 @@ private static SortedMap createNodeMap( } private static void fillNodesFromAssignment( - Map> assignment, Map nodeMap) { + Map> assignment, Map nodeMap, int capacity) { // Assign existing partitions back to nodes in a round-robin fashion. This ensures that // we prevent (when possible) multiple replicas of the same partition moving around in the // cluster at the same time. It also helps ensure that we have orphaned replicas that nodes @@ -117,7 +131,7 @@ private static void fillNodesFromAssignment( if (nodeIt.hasNext()) { int nodeId = nodeIt.next(); Node node = nodeMap.get(nodeId); - if (node != null && node.canAccept(partition)) { + if (node != null && node.canRetainPartition(partition, capacity)) { // The node from the current assignment must still exist and be able to // accept the partition. node.accept(partition); @@ -159,22 +173,40 @@ private static Map getOrphanedReplicas( return orphanedReplicas; } + private static List nodesByLeastLoaded(Collection nodes) { + List nodeList = new ArrayList(); + Iterator nodeIt = nodes.iterator(); + while (nodeIt.hasNext()) { + nodeList.add(nodeIt.next()); + } + + Collections.sort(nodeList, new Comparator() { + public int compare(Node o1, Node o2) { + if (o1.assignedPartitions.size() < o2.assignedPartitions.size()) { + return -1; + } else { + if (o1.assignedPartitions.size() > o2.assignedPartitions.size()) { + return 1; + } + } + return 0; + } + }); + return nodeList; + } + private static void assignOrphans( String topicName, SortedMap nodeMap, Map orphanedReplicas) { - // Don't process nodes in the same order for all topics to ensure that topics with fewer - // replicas than nodes are equally likely to be assigned anywhere (and not overload the - // brokers with earlier IDs). - Integer[] nodeProcessingOrder = getNodeProcessingOrder(topicName, nodeMap.keySet()); - List nodeProcessingOrderList = Arrays.asList(nodeProcessingOrder); - // Assign unassigned replicas to nodes that can accept them for (Map.Entry e : orphanedReplicas.entrySet()) { int partition = e.getKey(); int remainingReplicas = e.getValue(); - Iterator nodeIt = nodeProcessingOrderList.iterator(); + Collection mynodes = nodeMap.values(); + List nodeList = nodesByLeastLoaded(mynodes); + Iterator nodeIt = nodeList.iterator(); while (nodeIt.hasNext() && remainingReplicas > 0) { - Node node = nodeMap.get(nodeIt.next()); + Node node = nodeIt.next(); if (node.canAccept(partition)) { node.accept(partition); remainingReplicas--; @@ -317,6 +349,22 @@ public Node(int id, int capacity, Rack rack) { this.assignedPartitions = Sets.newTreeSet(); } + /** + * @param partition: Id of partition. + * @return whether given partition should remain hosted by this broker, or rather + * should move to a different broker. */ + + public boolean canRetainPartition(int partition, int withCapacity) { + return !assignedPartitions.contains(partition) && + (assignedPartitions.size() < withCapacity) && + rack.canAccept(partition); + } + + /** + * @param partition: Id of partition + * @return whether this broker can host the given partition, where the broker does not + * currently host the partition. + */ public boolean canAccept(int partition) { return !assignedPartitions.contains(partition) && assignedPartitions.size() < capacity && diff --git a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java index 1d56448..beb7a12 100644 --- a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java +++ b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java @@ -3,6 +3,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.HashMap; import java.util.Set; import com.google.common.collect.ImmutableList; @@ -81,15 +82,44 @@ public void testClusterExpansion() { } } + @Test + public void testClusterRebalance() { + String topic = "test"; + // We have 3 brokers: {#10,#11,#12} and a topic with 6 partitions; each partition is replicated + // twice, so a total load of 12 replicas. + // Two of the brokers: {#10,#11} have a replica of all of the partitions, while the last broker, + // #12, has none. Generate an even assignment of these partitions, where each broker has + // four of the 12 total replicas. + Map> currentAssignment = new HashMap>(); + currentAssignment.put(0, ImmutableList.of(10, 11)); + currentAssignment.put(1, ImmutableList.of(10,11)); + currentAssignment.put(2, ImmutableList.of(10,11)); + currentAssignment.put(3, ImmutableList.of(10,11)); + currentAssignment.put(4, ImmutableList.of(10,11)); + currentAssignment.put(5, ImmutableList.of(10,11)); + + Set brokers = ImmutableSet.of(10,11, 12); + KafkaTopicAssigner assigner = new KafkaTopicAssigner(); + Map> newAssignment = assigner.generateAssignment( + topic, currentAssignment, brokers, Collections.emptyMap(), 2); + Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts( + currentAssignment, newAssignment, 1); + for (Map.Entry brokerCount: brokerReplicaCounts.entrySet()) { + Assert.assertEquals(4, (int) brokerCount.getValue()); + } + } + @Test public void testDecommission() { String topic = "test"; + // We start with 4 brokers: {10,11,12,13}.. Map> currentAssignment = ImmutableMap.of( 0, (List) ImmutableList.of(10, 11), 1, ImmutableList.of(11, 12), 2, ImmutableList.of(12, 13), 3, ImmutableList.of(13, 10) ); + // .. and decommission one of them (12), so the new broker set is {10,11,13}. Set newBrokers = ImmutableSet.of(10, 11, 13); KafkaTopicAssigner assigner = new KafkaTopicAssigner(); Map> newAssignment = assigner.generateAssignment(