Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 72 additions & 24 deletions src/main/java/siftscience/kafka/tools/KafkaAssignmentStrategy.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package siftscience.kafka.tools;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -41,25 +44,36 @@ public static Map<Integer, List<Integer>> getRackAwareAssignment(
String topicName, Map<Integer, List<Integer>> currentAssignment,
Map<Integer, String> nodeRackAssignment, Set<Integer> nodes, Set<Integer> partitions,
int replicationFactor, Context context) {
// Initialize nodes with capacities and nothing assigned
int maxReplicas = getMaxReplicasPerNode(nodes, partitions, replicationFactor);
SortedMap<Integer, Node> nodeMap = createNodeMap(nodeRackAssignment, nodes, maxReplicas);
int capacity = maxReplicas;
while (capacity > 0) {
try {
// Initialize nodes with capacities and nothing assigned

// Using the current assignment, reassign as many partitions as each node can accept
fillNodesFromAssignment(currentAssignment, nodeMap);
SortedMap<Integer, Node> nodeMap = createNodeMap(nodeRackAssignment, nodes, maxReplicas);

// Figure out the replicas that have not been assigned yet
Map<Integer, Integer> orphanedReplicas = getOrphanedReplicas(nodeMap, partitions,
replicationFactor);
// Using the current assignment, reassign as many partitions as each node can accept
fillNodesFromAssignment(currentAssignment, nodeMap, capacity);

// Assign those replicas to nodes that can accept them
assignOrphans(topicName, nodeMap, orphanedReplicas);
// Figure out the replicas that have not been assigned yet
Map<Integer, Integer> orphanedReplicas = getOrphanedReplicas(nodeMap, partitions,
replicationFactor);

// Order nodes for each partition such that leadership is relatively balanced
if (context == null) {
context = new Context();
// Assign those replicas to nodes that can accept them
assignOrphans(topicName, nodeMap, orphanedReplicas);

// Order nodes for each partition such that leadership is relatively balanced
if (context == null) {
context = new Context();
}
return computePreferenceLists(topicName, nodeMap, context);
} catch (IllegalStateException e) {
// retry assignOrphans with a smaller capacity..
capacity--;
}
}
return computePreferenceLists(topicName, nodeMap, context);
// Failed to create a rack-aware assignment, even with capacity=1 per broker.
throw new IllegalStateException("Could not create a rack-aware assignment given the supplied constraints.");
}

private static int getMaxReplicasPerNode(
Expand Down Expand Up @@ -99,7 +113,7 @@ private static SortedMap<Integer, Node> createNodeMap(
}

private static void fillNodesFromAssignment(
Map<Integer, List<Integer>> assignment, Map<Integer, Node> nodeMap) {
Map<Integer, List<Integer>> assignment, Map<Integer, Node> nodeMap, int capacity) {
// Assign existing partitions back to nodes in a round-robin fashion. This ensures that
// we prevent (when possible) multiple replicas of the same partition moving around in the
// cluster at the same time. It also helps ensure that we have orphaned replicas that nodes
Expand All @@ -117,7 +131,7 @@ private static void fillNodesFromAssignment(
if (nodeIt.hasNext()) {
int nodeId = nodeIt.next();
Node node = nodeMap.get(nodeId);
if (node != null && node.canAccept(partition)) {
if (node != null && node.canRetainPartition(partition, capacity)) {
// The node from the current assignment must still exist and be able to
// accept the partition.
node.accept(partition);
Expand Down Expand Up @@ -159,22 +173,40 @@ private static Map<Integer, Integer> getOrphanedReplicas(
return orphanedReplicas;
}

private static List<Node> nodesByLeastLoaded(Collection<Node> nodes) {
List<Node> nodeList = new ArrayList<Node>();
Iterator<Node> nodeIt = nodes.iterator();
while (nodeIt.hasNext()) {
nodeList.add(nodeIt.next());
}

Collections.sort(nodeList, new Comparator<Node>() {
public int compare(Node o1, Node o2) {
if (o1.assignedPartitions.size() < o2.assignedPartitions.size()) {
return -1;
} else {
if (o1.assignedPartitions.size() > o2.assignedPartitions.size()) {
return 1;
}
}
return 0;
}
});
return nodeList;
}

private static void assignOrphans(
String topicName, SortedMap<Integer, Node> nodeMap,
Map<Integer, Integer> orphanedReplicas) {
// Don't process nodes in the same order for all topics to ensure that topics with fewer
// replicas than nodes are equally likely to be assigned anywhere (and not overload the
// brokers with earlier IDs).
Integer[] nodeProcessingOrder = getNodeProcessingOrder(topicName, nodeMap.keySet());
List<Integer> nodeProcessingOrderList = Arrays.asList(nodeProcessingOrder);

// Assign unassigned replicas to nodes that can accept them
for (Map.Entry<Integer, Integer> e : orphanedReplicas.entrySet()) {
int partition = e.getKey();
int remainingReplicas = e.getValue();
Iterator<Integer> nodeIt = nodeProcessingOrderList.iterator();
Collection<Node> mynodes = nodeMap.values();
List<Node> nodeList = nodesByLeastLoaded(mynodes);
Iterator<Node> nodeIt = nodeList.iterator();
while (nodeIt.hasNext() && remainingReplicas > 0) {
Node node = nodeMap.get(nodeIt.next());
Node node = nodeIt.next();
if (node.canAccept(partition)) {
node.accept(partition);
remainingReplicas--;
Expand Down Expand Up @@ -317,6 +349,22 @@ public Node(int id, int capacity, Rack rack) {
this.assignedPartitions = Sets.newTreeSet();
}

/**
* @param partition: Id of partition.
* @return whether given partition should remain hosted by this broker, or rather
* should move to a different broker. */

public boolean canRetainPartition(int partition, int withCapacity) {
return !assignedPartitions.contains(partition) &&
(assignedPartitions.size() < withCapacity) &&
rack.canAccept(partition);
}

/**
* @param partition: Id of partition
* @return whether this broker can host the given partition, where the broker does not
* currently host the partition.
*/
public boolean canAccept(int partition) {
return !assignedPartitions.contains(partition) &&
assignedPartitions.size() < capacity &&
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Set;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -81,15 +82,44 @@ public void testClusterExpansion() {
}
}

@Test
public void testClusterRebalance() {
String topic = "test";
// We have 3 brokers: {#10,#11,#12} and a topic with 6 partitions; each partition is replicated
// twice, so a total load of 12 replicas.
// Two of the brokers: {#10,#11} have a replica of all of the partitions, while the last broker,
// #12, has none. Generate an even assignment of these partitions, where each broker has
// four of the 12 total replicas.
Map<Integer, List<Integer>> currentAssignment = new HashMap<Integer, List<Integer>>();
currentAssignment.put(0, ImmutableList.of(10, 11));
currentAssignment.put(1, ImmutableList.of(10,11));
currentAssignment.put(2, ImmutableList.of(10,11));
currentAssignment.put(3, ImmutableList.of(10,11));
currentAssignment.put(4, ImmutableList.of(10,11));
currentAssignment.put(5, ImmutableList.of(10,11));

Set<Integer> brokers = ImmutableSet.of(10,11, 12);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, brokers, Collections.<Integer, String>emptyMap(), 2);
Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
for (Map.Entry<Integer, Integer> brokerCount: brokerReplicaCounts.entrySet()) {
Assert.assertEquals(4, (int) brokerCount.getValue());
}
}

@Test
public void testDecommission() {
String topic = "test";
// We start with 4 brokers: {10,11,12,13}..
Map<Integer, List<Integer>> currentAssignment = ImmutableMap.of(
0, (List<Integer>) ImmutableList.of(10, 11),
1, ImmutableList.of(11, 12),
2, ImmutableList.of(12, 13),
3, ImmutableList.of(13, 10)
);
// .. and decommission one of them (12), so the new broker set is {10,11,13}.
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
Expand Down