Skip to content

Commit 9895074

Browse files
committed
Add version field to KafkaConnector and KafkaMirrorMaker2 CR
Signed-off-by: Kate Stanley <[email protected]>
1 parent 089ecbb commit 9895074

File tree

14 files changed

+416
-8
lines changed

14 files changed

+416
-8
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* New way of defining the target (`.spec.target`) and source clusters (`.spec.mirrors[].source`) in the `KafkaMirrorMaker2` custom resources.
1111
* Strimzi Access Operator installation files updated to version 0.2.0
1212
* New feature gate `UseConnectBuildWithBuildah` (disabled by default) for running the Connect Build feature with Buildah instead of Kaniko on Kubernetes - according to [Strimzi Proposal #114](https://github.com/strimzi/proposals/blob/main/114-use-buildah-instead-of-kaniko.md).
13+
* New field `spec.version` in the `KafkaConnecter` custom resource, and new fields `spec.mirrors[].sourceConnector.version`, `spec.mirrors[].checkpointConnector.version`, and `spec.mirrors[].heartbeatConnector.version` in the `KafkaMirrorMaker2` custom resource for configuring the desired version of a connector.
1314

1415
### Major changes, deprecations, and removals
1516

@@ -30,6 +31,8 @@
3031
* The `.spec.build.output.additionalKanikoOptions` field in the `KafkaConnect` custom resource is deprecated and will be removed in the future.
3132
* Use `.spec.build.output.additionalBuildOptions` field instead.
3233
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
34+
* Configuring `connector.plugin.version` under `spec.config` in the `KafkaConnector` custom resource, and under `spec.mirrors[].sourceConnector.config`, `spec.mirrors[].checkpointConnector.config`, and `spec.mirrors[].heartbeatConnector.config` in the `KafkaMirrorMaker2` custom resource is deprecated and will be forbidden in the future.
35+
* Use `spec.version` in the `KafkaConnecter` custom resource, and `spec.mirrors[].sourceConnector.version`, `spec.mirrors[].checkpointConnector.version`, and `spec.mirrors[].heartbeatConnector.version` in the `KafkaMirrorMaker2` custom resource.
3336

3437
## 0.48.0
3538

api/src/main/java/io/strimzi/api/kafka/model/connector/AbstractConnectorSpec.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
builderPackage = Constants.FABRIC8_KUBERNETES_API
3030
)
3131
@JsonInclude(JsonInclude.Include.NON_NULL)
32-
@JsonPropertyOrder({"pause", "tasksMax", "config", "state", "listOffsets", "alterOffsets"})
32+
@JsonPropertyOrder({"pause", "tasksMax", "version", "config", "state", "listOffsets", "alterOffsets"})
3333
@EqualsAndHashCode(callSuper = true)
3434
@ToString(callSuper = true)
3535
public abstract class AbstractConnectorSpec extends Spec {
@@ -40,6 +40,7 @@ public abstract class AbstractConnectorSpec extends Spec {
4040

4141
private Integer tasksMax;
4242
private Boolean pause;
43+
private String version;
4344
private Map<String, Object> config = new HashMap<>(0);
4445
private ConnectorState state;
4546

@@ -66,6 +67,24 @@ public void setTasksMax(Integer tasksMax) {
6667
this.tasksMax = tasksMax;
6768
}
6869

70+
/**
71+
* @return Version or version range for the Kafka Connector
72+
*/
73+
@Description("Desired version or version range to respect when starting the Kafka Connector. This is only supported when using Kafka Connect version 4.1.0 and higher.")
74+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
75+
public String getVersion() {
76+
return version;
77+
}
78+
79+
/**
80+
* Sets the plugin version string for the Kafka Connector
81+
*
82+
* @param version Version or version range
83+
*/
84+
public void setVersion(String version) {
85+
this.version = version;
86+
}
87+
6988
/**
7089
* @return Connector configuration
7190
*/

api/src/main/java/io/strimzi/api/kafka/model/connector/KafkaConnectorSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
builderPackage = Constants.FABRIC8_KUBERNETES_API
1919
)
2020
@JsonInclude(JsonInclude.Include.NON_NULL)
21-
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "config", "pause", "state", "listOffsets", "alterOffsets"})
21+
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "version", "config", "pause", "state", "listOffsets", "alterOffsets"})
2222
@EqualsAndHashCode(callSuper = true)
2323
@ToString(callSuper = true)
2424
public class KafkaConnectorSpec extends AbstractConnectorSpec {

api/src/main/java/io/strimzi/api/kafka/model/mirrormaker2/KafkaMirrorMaker2ConnectorSpec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
builderPackage = Constants.FABRIC8_KUBERNETES_API
1818
)
1919
@JsonInclude(JsonInclude.Include.NON_NULL)
20-
@JsonPropertyOrder({"tasksMax", "pause", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
20+
@JsonPropertyOrder({"tasksMax", "pause", "version", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
2121
@EqualsAndHashCode(callSuper = true)
2222
@ToString(callSuper = true)
2323
public class KafkaMirrorMaker2ConnectorSpec extends AbstractConnectorSpec { }

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ public List<KafkaConnector> generateConnectorDefinitions() {
125125
.withState(mm2ConnectorSpec.getState())
126126
.withAutoRestart(mm2ConnectorSpec.getAutoRestart())
127127
.withTasksMax(mm2ConnectorSpec.getTasksMax())
128+
.withVersion(mm2ConnectorSpec.getVersion())
128129
.withListOffsets(mm2ConnectorSpec.getListOffsets())
129130
.withAlterOffsets(mm2ConnectorSpec.getAlterOffsets())
130131
.endSpec()

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -435,13 +435,21 @@ protected Future<Void> connectPodDisruptionBudget(Reconciliation reconciliation,
435435
protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient,
436436
String connectorName, KafkaConnectorSpec connectorSpec, CustomResource resource) {
437437
KafkaConnectorConfiguration desiredConfig = new KafkaConnectorConfiguration(reconciliation, connectorSpec.getConfig().entrySet());
438+
// In future connector.plugin.version will be added to forbidden list, for now add warning to conditions if specified
439+
List<Condition> initialConditions = new ArrayList<>();
440+
if (desiredConfig.getConfigOption("connector.plugin.version") != null) {
441+
String message = "Config option connector.plugin.version has been set under the config field. This is deprecated and will be forbidden in future. " +
442+
"Use version field instead.";
443+
LOGGER.warnCr(reconciliation, message);
444+
initialConditions.add(StatusUtils.buildWarningCondition("DeprecatedFields", message));
445+
}
438446

439447
return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorConfig(reconciliation, new BackOff(200L, 2, 6), host, port, connectorName)).compose(
440448
currentConfig -> {
441449
if (!needsReconfiguring(reconciliation, connectorName, connectorSpec, desiredConfig.asOrderedProperties().asMap(), currentConfig)) {
442450
LOGGER.debugCr(reconciliation, "Connector {} exists and has desired config, {}=={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
443451
return VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName))
444-
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, new ArrayList<>()))
452+
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, initialConditions))
445453
.compose(conditions -> manageConnectorOffsets(reconciliation, host, apiClient, connectorName, resource, connectorSpec, conditions))
446454
.compose(conditions -> maybeRestartConnector(reconciliation, host, apiClient, connectorName, resource, conditions))
447455
.compose(conditions -> maybeRestartConnectorTask(reconciliation, host, apiClient, connectorName, resource, conditions))
@@ -452,7 +460,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
452460
} else {
453461
LOGGER.debugCr(reconciliation, "Connector {} exists but does not have desired config, {}!={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
454462
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
455-
.compose(createConnectorStatusAndConditions())
463+
.compose(createConnectorStatusAndConditions(initialConditions))
456464
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
457465
}
458466
},
@@ -461,7 +469,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
461469
&& ((ConnectRestException) error).getStatusCode() == 404) {
462470
LOGGER.debugCr(reconciliation, "Connector {} does not exist", connectorName);
463471
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
464-
.compose(createConnectorStatusAndConditions())
472+
.compose(createConnectorStatusAndConditions(initialConditions))
465473
.compose(status -> autoRestartFailedConnectorAndTasks(reconciliation, host, apiClient, connectorName, connectorSpec, status, resource))
466474
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
467475
} else {
@@ -474,11 +482,14 @@ private boolean needsReconfiguring(Reconciliation reconciliation, String connect
474482
KafkaConnectorSpec connectorSpec,
475483
Map<String, String> desiredConfig,
476484
Map<String, String> actualConfig) {
477-
// The actual which comes from Connect API includes tasks.max, connector.class and name,
485+
// The actual which comes from Connect API includes tasks.max, connector.class, connector.plugin.version (if set) and name,
478486
// which connectorSpec.getConfig() does not
479487
if (connectorSpec.getTasksMax() != null) {
480488
desiredConfig.put("tasks.max", connectorSpec.getTasksMax().toString());
481489
}
490+
if (connectorSpec.getVersion() != null) {
491+
desiredConfig.put("connector.plugin.version", connectorSpec.getVersion());
492+
}
482493
desiredConfig.put("name", connectorName);
483494
desiredConfig.put("connector.class", connectorSpec.getClassName());
484495

@@ -1178,6 +1189,10 @@ private static JsonObject asJson(KafkaConnectorSpec spec, KafkaConnectorConfigur
11781189
connectorConfigJson.put("tasks.max", spec.getTasksMax());
11791190
}
11801191

1192+
if (spec.getVersion() != null) {
1193+
connectorConfigJson.put("connector.plugin.version", spec.getVersion());
1194+
}
1195+
11811196
return connectorConfigJson.put("connector.class", spec.getClassName());
11821197
}
11831198

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaMirrorMaker2AssemblyOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.fabric8.kubernetes.client.CustomResource;
1010
import io.fabric8.kubernetes.client.KubernetesClient;
1111
import io.strimzi.api.kafka.model.common.CertSecretSource;
12+
import io.strimzi.api.kafka.model.common.Condition;
1213
import io.strimzi.api.kafka.model.connector.AutoRestartStatus;
1314
import io.strimzi.api.kafka.model.connector.KafkaConnector;
1415
import io.strimzi.api.kafka.model.connector.KafkaConnectorSpec;
@@ -147,8 +148,12 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
147148
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : reconcileConnectors(reconciliation, kafkaMirrorMaker2, mirrorMaker2Cluster, kafkaMirrorMaker2Status))
148149
.map((Void) null)
149150
.onComplete(reconciliationResult -> {
151+
// Extract warning conditions from reconciliation
152+
List<Condition> warningConditions = kafkaMirrorMaker2Status.getConditions();
150153
StatusUtils.setStatusConditionAndObservedGeneration(kafkaMirrorMaker2, kafkaMirrorMaker2Status, reconciliationResult.cause());
151-
154+
if (warningConditions != null && !warningConditions.isEmpty()) {
155+
kafkaMirrorMaker2Status.addConditions(warningConditions);
156+
}
152157
if (!hasZeroReplicas) {
153158
kafkaMirrorMaker2Status.setUrl(KafkaMirrorMaker2Resources.url(mirrorMaker2Cluster.getCluster(), namespace, KafkaMirrorMaker2Cluster.REST_API_PORT));
154159
}

0 commit comments

Comments
 (0)