Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>siftscience</groupId>
<artifactId>kafka-assigner</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<version>1.1</version>

<name>kafka-assigner</name>
<description>Tools for reassigning Kafka partitions with minimal movement</description>
Expand Down Expand Up @@ -130,4 +130,4 @@
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ public class KafkaAssignmentGenerator {
usage = "comma-separated list of topics")
private String topics = null;

@Option(name = "--desired_replication_factor",
usage = "used for changing replication factor for topics, if not present it will use the existing number")
private int desiredReplicationFactor = -1;

@Option(name = "--disable_rack_awareness",
usage = "set to true to ignore rack configurations")
private boolean disableRackAwareness = false;
Expand Down Expand Up @@ -126,7 +130,7 @@ private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException {

private static void printLeastDisruptiveReassignment(
ZkUtils zkUtils, List<String> specifiedTopics, Set<Integer> specifiedBrokers,
Set<Integer> excludedBrokers, Map<Integer, String> rackAssignment)
Set<Integer> excludedBrokers, Map<Integer, String> rackAssignment, int desiredReplicationFactor)
throws JSONException {
// We need three inputs for rebalacing: the brokers, the topics, and the current assignment
// of topics to brokers.
Expand Down Expand Up @@ -169,7 +173,7 @@ public Integer apply(Broker broker) {
for (String topic : JavaConversions.seqAsJavaList(topics)) {
Map<Integer, List<Integer>> partitionAssignment = initialAssignments.get(topic);
Map<Integer, List<Integer>> finalAssignment = assigner.generateAssignment(
topic, partitionAssignment, brokers, rackAssignment);
topic, partitionAssignment, brokers, rackAssignment, desiredReplicationFactor);
for (Map.Entry<Integer, List<Integer>> e : finalAssignment.entrySet()) {
JSONObject partitionJson = new JSONObject();
partitionJson.put("topic", topic);
Expand Down Expand Up @@ -284,7 +288,7 @@ private void runTool(String[] args) throws JSONException {
break;
case PRINT_REASSIGNMENT:
printLeastDisruptiveReassignment(zkUtils, topics, brokerIdSet,
excludedBrokerIdSet, rackAssignment);
excludedBrokerIdSet, rackAssignment, desiredReplicationFactor);
break;
default:
throw new UnsupportedOperationException("Invalid mode: " + mode);
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,26 @@ public KafkaTopicAssigner() {
* in each list is the "leader" replica for a partition.
* @param brokers a list of broker IDs as strings
* @param rackAssignment a map from broker ID to rack ID if a rack is defined for that broker
* @param desiredReplicationFactor used to change replication factor, use -1 to keep the same as
* the original topic
* @return the new assignment: a map from partition ID to ordered list of broker IDs
*/
public Map<Integer, List<Integer>> generateAssignment(
String topic, Map<Integer, List<Integer>> currentAssignment, Set<Integer> brokers,
Map<Integer, String> rackAssignment) {
Map<Integer, String> rackAssignment, int desiredReplicationFactor) {
// We need to do 2 things:
// - Get the set of partitions as integers
// - Figure out the replication factor (which should be the same for each partition)
int replicationFactor = -1;
// if desiredReplicationFactor is negative
int replicationFactor = desiredReplicationFactor;
Set<Integer> partitions = Sets.newTreeSet();
for (Map.Entry<Integer, List<Integer>> entry : currentAssignment.entrySet()) {
int partition = entry.getKey();
List<Integer> replicas = entry.getValue();
partitions.add(partition);
if (replicationFactor < 0) {
replicationFactor = replicas.size();
} else {
} else if (desiredReplicationFactor < 0) {
Preconditions.checkState(replicationFactor == replicas.size(),
"Topic " + topic + " has partition " + partition +
" with unexpected replication factor " + replicas.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testRackAwareExpansion() {
);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, rackAssignments);
topic, currentAssignment, newBrokers, rackAssignments, -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testClusterExpansion() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 12, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(), -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand All @@ -93,7 +93,7 @@ public void testDecommission() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(), -1);

Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testReplacement() {
Set<Integer> newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map<Integer, List<Integer>> newAssignment = assigner.generateAssignment(
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap());
topic, currentAssignment, newBrokers, Collections.<Integer, String>emptyMap(),-1);

// run basic sanity checks
Map<Integer, Integer> brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
Expand Down