Skip to content

Commit 6b41cda

Browse files
tombentleytinaselenge
authored andcommitted
Proof of concept implementation for KafkaRoller 2.0
Signed-off-by: Gantigmaa Selenge <[email protected]>
1 parent ddea7f6 commit 6b41cda

35 files changed

+4549
-27
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/ClusterOperatorConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,11 @@ public class ClusterOperatorConfig {
145145
*/
146146
public static final ConfigParameter<Long> OPERATION_TIMEOUT_MS = new ConfigParameter<>("STRIMZI_OPERATION_TIMEOUT_MS", LONG, "300000", CONFIG_VALUES);
147147

148+
/**
149+
* The maximum number of broker nodes that can be restarted at once
150+
*/
151+
public static final ConfigParameter<Integer> MAX_RESTART_BATCH_SIZE = new ConfigParameter<>("STRIMZI_MAX_RESTART_BATCH_SIZE", INTEGER, "1", CONFIG_VALUES);
152+
148153
/**
149154
* Timeout used to wait for a Kafka Connect builds to finish
150155
*/
@@ -458,6 +463,13 @@ public long getOperationTimeoutMs() {
458463
return get(OPERATION_TIMEOUT_MS);
459464
}
460465

466+
/**
467+
* @return how many broker nodes can be restarted in parallel
468+
*/
469+
public int getMaxRestartBatchSize() {
470+
return get(MAX_RESTART_BATCH_SIZE);
471+
}
472+
461473
/**
462474
* @return How many milliseconds should we wait for Kafka Connect build to complete
463475
*/

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/RestartReasons.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,4 +146,12 @@ public int hashCode() {
146146
public String toString() {
147147
return reasons.keySet().toString();
148148
}
149+
150+
/**
151+
* @param reason The reason to test.
152+
* @return true if these reasons are just the single given reason.
153+
*/
154+
public boolean isSingletonOf(RestartReason reason) {
155+
return reasons.size() == 1 && reasons.containsKey(reason);
156+
}
149157
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import io.strimzi.operator.cluster.model.RestartReasons;
4949
import io.strimzi.operator.cluster.operator.resource.ConcurrentDeletionException;
5050
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClientProvider;
51-
import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
51+
//import io.strimzi.operator.cluster.operator.resource.KafkaRoller;
5252
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
5353
import io.strimzi.operator.cluster.operator.resource.events.KubernetesRestartEventPublisher;
5454
import io.strimzi.operator.cluster.operator.resource.kubernetes.ClusterRoleBindingOperator;
@@ -66,9 +66,10 @@
6666
import io.strimzi.operator.cluster.operator.resource.kubernetes.ServiceOperator;
6767
import io.strimzi.operator.cluster.operator.resource.kubernetes.StorageClassOperator;
6868
import io.strimzi.operator.cluster.operator.resource.kubernetes.StrimziPodSetOperator;
69+
import io.strimzi.operator.cluster.operator.resource.rolling.RackRolling;
6970
import io.strimzi.operator.common.AdminClientProvider;
7071
import io.strimzi.operator.common.Annotations;
71-
import io.strimzi.operator.common.BackOff;
72+
//import io.strimzi.operator.common.e;
7273
import io.strimzi.operator.common.Reconciliation;
7374
import io.strimzi.operator.common.ReconciliationLogger;
7475
import io.strimzi.operator.common.Util;
@@ -115,6 +116,7 @@ public class KafkaReconciler {
115116

116117
// Various settings
117118
private final long operationTimeoutMs;
119+
private final int maxRestartBatchSize;
118120
private final boolean isNetworkPolicyGeneration;
119121
private final boolean isPodDisruptionBudgetGeneration;
120122
private final boolean isKafkaNodePoolsEnabled;
@@ -197,6 +199,7 @@ public KafkaReconciler(
197199
this.reconciliation = reconciliation;
198200
this.vertx = vertx;
199201
this.operationTimeoutMs = config.getOperationTimeoutMs();
202+
this.maxRestartBatchSize = config.getMaxRestartBatchSize();
200203
this.kafkaNodePoolCrs = nodePools;
201204
this.kafka = kafka;
202205

@@ -464,23 +467,47 @@ protected Future<Void> maybeRollKafka(
464467
Map<Integer, Map<String, String>> kafkaAdvertisedPorts,
465468
boolean allowReconfiguration
466469
) {
467-
return new KafkaRoller(
468-
reconciliation,
469-
vertx,
470-
podOperator,
471-
1_000,
472-
operationTimeoutMs,
473-
() -> new BackOff(250, 2, 10),
474-
nodes,
475-
this.coTlsPemIdentity,
476-
adminClientProvider,
477-
kafkaAgentClientProvider,
478-
brokerId -> kafka.generatePerBrokerConfiguration(brokerId, kafkaAdvertisedHostnames, kafkaAdvertisedPorts),
479-
logging,
480-
kafka.getKafkaVersion(),
481-
allowReconfiguration,
482-
eventsPublisher
483-
).rollingRestart(podNeedsRestart);
470+
Function<Integer, String> kafkaConfigProvider = nodeId -> kafka.generatePerBrokerConfiguration(nodeId, kafkaAdvertisedHostnames, kafkaAdvertisedPorts);
471+
//TODO: Change this logic to run the new roller if the feature gate for it is enabled (also add feature gate).
472+
473+
var rr = RackRolling.rollingRestart(
474+
podOperator,
475+
nodes,
476+
reconciliation,
477+
// Remap the function from pod to RestartReasons to nodeId to RestartReasons
478+
nodeId -> podNeedsRestart.apply(podOperator.get(reconciliation.namespace(), nodes.stream().filter(nodeRef -> nodeRef.nodeId() == nodeId).collect(Collectors.toList()).get(0).podName())),
479+
this.coTlsPemIdentity,
480+
adminClientProvider,
481+
kafkaAgentClientProvider,
482+
kafkaConfigProvider,
483+
allowReconfiguration,
484+
kafka.getKafkaVersion(),
485+
logging,
486+
operationTimeoutMs,
487+
maxRestartBatchSize,
488+
3,
489+
3,
490+
10,
491+
eventsPublisher);
492+
493+
return rr.executeRollingAsync(vertx);
494+
// return new KafkaRoller(
495+
// reconciliation,
496+
// vertx,
497+
// podOperator,
498+
// 1_000,
499+
// operationTimeoutMs,
500+
// () -> new BackOff(250, 2, 10),
501+
// nodes,
502+
// this.coTlsPemIdentity,
503+
// adminClientProvider,
504+
// kafkaAgentClientProvider,
505+
// kafkaConfigProvider,
506+
// logging,
507+
// kafka.getKafkaVersion(),
508+
// allowReconfiguration,
509+
// eventsPublisher
510+
// ).rollingRestart(podNeedsRestart);
484511
}
485512

486513
/**

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/BrokerState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
/**
1313
* Java representation of the JSON response from the /v1/broker-state endpoint of the KafkaAgent
1414
*/
15-
class BrokerState {
15+
public class BrokerState {
1616
private static final int BROKER_RECOVERY_STATE = 2;
1717

1818
private final int code;

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerConfigurationDiff.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,11 @@ public class KafkaBrokerConfigurationDiff extends AbstractJsonDiff {
7474
* @param kafkaVersion Kafka version
7575
* @param brokerNodeRef Broker node reference
7676
*/
77-
protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired, KafkaVersion kafkaVersion, NodeRef brokerNodeRef) {
77+
public KafkaBrokerConfigurationDiff(Reconciliation reconciliation,
78+
Config brokerConfigs,
79+
String desired,
80+
KafkaVersion kafkaVersion,
81+
NodeRef brokerNodeRef) {
7882
this.reconciliation = reconciliation;
7983
this.configModel = KafkaConfiguration.readConfigModel(kafkaVersion);
8084
this.brokerConfigDiff = diff(brokerNodeRef, desired, brokerConfigs, configModel);
@@ -83,7 +87,7 @@ protected KafkaBrokerConfigurationDiff(Reconciliation reconciliation, Config bro
8387
/**
8488
* @return Returns true if the configuration can be updated dynamically
8589
*/
86-
protected boolean canBeUpdatedDynamically() {
90+
public boolean canBeUpdatedDynamically() {
8791
boolean result = true;
8892
for (AlterConfigOp entry : brokerConfigDiff) {
8993
if (isEntryReadOnly(entry.configEntry())) {
@@ -107,7 +111,7 @@ private boolean isEntryReadOnly(ConfigEntry entry) {
107111
* Returns configuration difference
108112
* @return Collection of AlterConfigOp containing difference between current and desired configuration
109113
*/
110-
protected Collection<AlterConfigOp> getConfigDiff() {
114+
public Collection<AlterConfigOp> getConfigDiff() {
111115
return brokerConfigDiff;
112116
}
113117

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaBrokerLoggingConfigurationDiff.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class KafkaBrokerLoggingConfigurationDiff extends AbstractJsonDiff {
3838
* @param brokerConfigs Current broker configuration from Kafka Admin API
3939
* @param desired Desired logging configuration
4040
*/
41-
protected KafkaBrokerLoggingConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired) {
41+
public KafkaBrokerLoggingConfigurationDiff(Reconciliation reconciliation, Config brokerConfigs, String desired) {
4242
this.reconciliation = reconciliation;
4343
this.diff = diff(desired, brokerConfigs);
4444
}
@@ -47,7 +47,7 @@ protected KafkaBrokerLoggingConfigurationDiff(Reconciliation reconciliation, Con
4747
* Returns logging difference
4848
* @return Collection of AlterConfigOp containing difference between current and desired logging configuration
4949
*/
50-
protected Collection<AlterConfigOp> getLoggingDiff() {
50+
public Collection<AlterConfigOp> getLoggingDiff() {
5151
return diff;
5252
}
5353

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/kubernetes/PodOperator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Future<Void> restart(Reconciliation reconciliation, Pod pod, long timeout
5151
String namespace = pod.getMetadata().getNamespace();
5252
String podName = pod.getMetadata().getName();
5353
Promise<Void> deleteFinished = Promise.promise();
54-
LOGGER.infoCr(reconciliation, "Rolling pod {}", podName);
54+
LOGGER.infoCr(reconciliation, "Deleting pod {}", podName);
5555

5656
// Determine generation of deleted pod
5757
String deleted = getPodUid(pod);
@@ -66,7 +66,7 @@ public Future<Void> restart(Reconciliation reconciliation, Pod pod, long timeout
6666
boolean done = !deleted.equals(newUid);
6767

6868
if (done) {
69-
LOGGER.debugCr(reconciliation, "Rolling pod {} finished", podName);
69+
LOGGER.debugCr(reconciliation, "Deleting pod {} finished", podName);
7070
}
7171

7272
return done;
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.resource.rolling;
6+
7+
import io.strimzi.operator.cluster.model.NodeRef;
8+
9+
/**
10+
* An abstraction over a KafkaAgent client.
11+
*/
12+
interface AgentClient {
13+
14+
/** @return The broker state, according to the Kafka Agent */
15+
BrokerState getBrokerState(NodeRef nodeRef);
16+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.resource.rolling;
6+
7+
import io.strimzi.operator.cluster.model.NodeRef;
8+
import io.strimzi.operator.cluster.operator.resource.KafkaAgentClient;
9+
10+
class AgentClientImpl implements AgentClient {
11+
private final KafkaAgentClient kafkaAgentClient;
12+
13+
AgentClientImpl(KafkaAgentClient kafkaAgentClient) {
14+
this.kafkaAgentClient = kafkaAgentClient;
15+
16+
}
17+
18+
@Override
19+
public BrokerState getBrokerState(NodeRef nodeRef) {
20+
var result = kafkaAgentClient.getBrokerState(nodeRef.podName());
21+
BrokerState brokerState = BrokerState.fromValue((byte) result.code());
22+
brokerState.setRemainingSegmentsToRecover(result.remainingSegmentsToRecover());
23+
brokerState.setRemainingLogsToRecover(result.remainingLogsToRecover());
24+
return brokerState;
25+
}
26+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.operator.cluster.operator.resource.rolling;
6+
7+
import io.strimzi.operator.common.UncheckedInterruptedException;
8+
9+
import java.util.concurrent.TimeoutException;
10+
import java.util.function.BooleanSupplier;
11+
import java.util.function.Supplier;
12+
13+
/**
14+
* Timing utility for polling loops which allows to set an alarm (in terms of a duration from "now") and
15+
* subsequently sleep the executing thread. If the alarm duration is exceeded the call to sleep will throw a
16+
* {@link TimeoutException}. This can be used to simplify writing polling logic like the following
17+
* <pre>{@code
18+
* long timeoutMs = 60_000
19+
* long pollIntervalMs = 1_000;
20+
* Alarm alarm = Alarm.start(time, timeoutMs);
21+
* while (true) {
22+
* // do some processing
23+
* if (processingSuccess) {
24+
* timeoutMs = alarm.remainingMs();
25+
* // we might want to use the remaining timeout when
26+
* // a single timeout is used for a sequence of polling tasks
27+
* break;
28+
* }
29+
* alarm.sleep(pollIntervalMs);
30+
* }
31+
* }</pre>
32+
* This logic is encapsulated in the {@link #poll(long, BooleanSupplier)} method.
33+
*/
34+
public class Alarm {
35+
36+
final Time time;
37+
final long deadline;
38+
private final Supplier<String> timeoutMessageSupplier;
39+
40+
private Alarm(Time time, long deadline, Supplier<String> timeoutMessageSupplier) {
41+
this.time = time;
42+
this.deadline = deadline;
43+
this.timeoutMessageSupplier = timeoutMessageSupplier;
44+
}
45+
46+
/**
47+
* Creates an Alerm
48+
* @param time The source of time
49+
* @param timeoutMs The timeout for this alarm.
50+
* @param timeoutMessageSupplier The exception message
51+
* @return The alarm
52+
*/
53+
public static Alarm timer(Time time, long timeoutMs, Supplier<String> timeoutMessageSupplier) {
54+
if (timeoutMs < 0) {
55+
throw new IllegalArgumentException();
56+
}
57+
long deadline = time.nanoTime() + 1_000_000 * timeoutMs;
58+
return new Alarm(time, deadline, timeoutMessageSupplier);
59+
}
60+
61+
/**
62+
* @return The remaining number of milliseconds until the deadline passed
63+
*/
64+
public long remainingMs() {
65+
return Math.max(deadline - time.nanoTime(), 0) / 1_000_000L;
66+
}
67+
68+
/**
69+
* Sleep the current thread for at most at least {@code ms} milliseconds, according to
70+
* (and subject to the precision and accuracy of) the configured {@link Time} instance.
71+
* The actual sleep time will be less than {@code ms} if using {@code ms} would exceed this
72+
* alarm's deadline.
73+
* The thread does not lose ownership of any monitors.
74+
* @param ms The number of milliseconds to sleep for.
75+
* @throws TimeoutException If the Alarm's deadline has passed
76+
* @throws InterruptedException If the current thread is interrupted
77+
*/
78+
public void sleep(long ms) throws TimeoutException, InterruptedException {
79+
if (ms < 0) {
80+
throw new IllegalArgumentException();
81+
}
82+
long sleepNs = Math.min(1_000_000L * ms, deadline - time.nanoTime());
83+
if (sleepNs <= 0) {
84+
throw new TimeoutException(timeoutMessageSupplier.get());
85+
}
86+
time.sleep(sleepNs / 1_000_000L, (int) (sleepNs % 1_000_000L));
87+
}
88+
89+
/**
90+
* Test {@code done} at least once, returning when it returns true, and otherwise sleeping for at most approximately
91+
* {@code pollIntervalMs} before repeating, throwing {@link TimeoutException} should this
92+
* alarm expire before {@code done} returns true.
93+
*
94+
* @param pollIntervalMs The polling interval
95+
* @param done A predicate function to detecting when the polling loop is complete.
96+
* @return The remaining time left for this alarm, in ms.
97+
* @throws UncheckedInterruptedException The thread was interrupted
98+
* @throws TimeoutException The {@link #remainingMs()} has reached zero.
99+
*/
100+
public long poll(long pollIntervalMs, BooleanSupplier done) throws TimeoutException {
101+
if (pollIntervalMs <= 0) {
102+
throw new IllegalArgumentException();
103+
}
104+
try {
105+
while (true) {
106+
if (done.getAsBoolean()) {
107+
return this.remainingMs();
108+
}
109+
this.sleep(pollIntervalMs);
110+
}
111+
} catch (InterruptedException e) {
112+
throw new UncheckedInterruptedException(e);
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)