diff --git a/pom.xml b/pom.xml
index c495f0a..399cd43 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
siftscience
kafka-assigner
jar
- 1.0
+ 1.1
kafka-assigner
Tools for reassigning Kafka partitions with minimal movement
@@ -130,4 +130,4 @@
-
\ No newline at end of file
+
diff --git a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java
index 05b9abf..9375154 100644
--- a/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java
+++ b/src/main/java/siftscience/kafka/tools/KafkaAssignmentGenerator.java
@@ -75,6 +75,10 @@ public class KafkaAssignmentGenerator {
usage = "comma-separated list of topics")
private String topics = null;
+ @Option(name = "--desired_replication_factor",
+ usage = "used for changing replication factor for topics, if not present it will use the existing number")
+ private int desiredReplicationFactor = -1;
+
@Option(name = "--disable_rack_awareness",
usage = "set to true to ignore rack configurations")
private boolean disableRackAwareness = false;
@@ -126,7 +130,7 @@ private static void printCurrentBrokers(ZkUtils zkUtils) throws JSONException {
private static void printLeastDisruptiveReassignment(
ZkUtils zkUtils, List specifiedTopics, Set specifiedBrokers,
- Set excludedBrokers, Map rackAssignment)
+ Set excludedBrokers, Map rackAssignment, int desiredReplicationFactor)
throws JSONException {
// We need three inputs for rebalacing: the brokers, the topics, and the current assignment
// of topics to brokers.
@@ -169,7 +173,7 @@ public Integer apply(Broker broker) {
for (String topic : JavaConversions.seqAsJavaList(topics)) {
Map> partitionAssignment = initialAssignments.get(topic);
Map> finalAssignment = assigner.generateAssignment(
- topic, partitionAssignment, brokers, rackAssignment);
+ topic, partitionAssignment, brokers, rackAssignment, desiredReplicationFactor);
for (Map.Entry> e : finalAssignment.entrySet()) {
JSONObject partitionJson = new JSONObject();
partitionJson.put("topic", topic);
@@ -284,7 +288,7 @@ private void runTool(String[] args) throws JSONException {
break;
case PRINT_REASSIGNMENT:
printLeastDisruptiveReassignment(zkUtils, topics, brokerIdSet,
- excludedBrokerIdSet, rackAssignment);
+ excludedBrokerIdSet, rackAssignment, desiredReplicationFactor);
break;
default:
throw new UnsupportedOperationException("Invalid mode: " + mode);
diff --git a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java
index f34dd50..5b5211e 100644
--- a/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java
+++ b/src/main/java/siftscience/kafka/tools/KafkaTopicAssigner.java
@@ -35,15 +35,18 @@ public KafkaTopicAssigner() {
* in each list is the "leader" replica for a partition.
* @param brokers a list of broker IDs as strings
* @param rackAssignment a map from broker ID to rack ID if a rack is defined for that broker
+ * @param desiredReplicationFactor used to change replication factor, use -1 to keep the same as
+ * the original topic
* @return the new assignment: a map from partition ID to ordered list of broker IDs
*/
public Map> generateAssignment(
String topic, Map> currentAssignment, Set brokers,
- Map rackAssignment) {
+ Map rackAssignment, int desiredReplicationFactor) {
// We need to do 2 things:
// - Get the set of partitions as integers
// - Figure out the replication factor (which should be the same for each partition)
- int replicationFactor = -1;
+ // if desiredReplicationFactor is negative
+ int replicationFactor = desiredReplicationFactor;
Set partitions = Sets.newTreeSet();
for (Map.Entry> entry : currentAssignment.entrySet()) {
int partition = entry.getKey();
@@ -51,7 +54,7 @@ public Map> generateAssignment(
partitions.add(partition);
if (replicationFactor < 0) {
replicationFactor = replicas.size();
- } else {
+ } else if (desiredReplicationFactor < 0) {
Preconditions.checkState(replicationFactor == replicas.size(),
"Topic " + topic + " has partition " + partition +
" with unexpected replication factor " + replicas.size());
diff --git a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java
index 8c63d68..1d56448 100644
--- a/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java
+++ b/src/test/java/siftscience/kafka/tools/KafkaTopicAssignerTest.java
@@ -35,7 +35,7 @@ public void testRackAwareExpansion() {
);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map> newAssignment = assigner.generateAssignment(
- topic, currentAssignment, newBrokers, rackAssignments);
+ topic, currentAssignment, newBrokers, rackAssignments, -1);
Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
@@ -68,7 +68,7 @@ public void testClusterExpansion() {
Set newBrokers = ImmutableSet.of(10, 11, 12, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map> newAssignment = assigner.generateAssignment(
- topic, currentAssignment, newBrokers, Collections.emptyMap());
+ topic, currentAssignment, newBrokers, Collections.emptyMap(), -1);
Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
@@ -93,7 +93,7 @@ public void testDecommission() {
Set newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map> newAssignment = assigner.generateAssignment(
- topic, currentAssignment, newBrokers, Collections.emptyMap());
+ topic, currentAssignment, newBrokers, Collections.emptyMap(), -1);
Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(
currentAssignment, newAssignment, 1);
@@ -133,7 +133,7 @@ public void testReplacement() {
Set newBrokers = ImmutableSet.of(10, 11, 13);
KafkaTopicAssigner assigner = new KafkaTopicAssigner();
Map> newAssignment = assigner.generateAssignment(
- topic, currentAssignment, newBrokers, Collections.emptyMap());
+ topic, currentAssignment, newBrokers, Collections.emptyMap(),-1);
// run basic sanity checks
Map brokerReplicaCounts = verifyPartitionsAndBuildReplicaCounts(