Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* New way of defining the target (`.spec.target`) and source clusters (`.spec.mirrors[].source`) in the `KafkaMirrorMaker2` custom resources.
* Strimzi Access Operator installation files updated to version 0.2.0
* New feature gate `UseConnectBuildWithBuildah` (disabled by default) for running the Connect Build feature with Buildah instead of Kaniko on Kubernetes - according to [Strimzi Proposal #114](https://github.com/strimzi/proposals/blob/main/114-use-buildah-instead-of-kaniko.md).
* New field `spec.version` in the `KafkaConnecter` custom resource, and new fields `spec.mirrors[].sourceConnector.version`, `spec.mirrors[].checkpointConnector.version`, and `spec.mirrors[].heartbeatConnector.version` in the `KafkaMirrorMaker2` custom resource for configuring the desired version of a connector.

### Major changes, deprecations, and removals

Expand All @@ -30,6 +31,8 @@
* The `.spec.build.output.additionalKanikoOptions` field in the `KafkaConnect` custom resource is deprecated and will be removed in the future.
* Use `.spec.build.output.additionalBuildOptions` field instead.
* Kafka nodes are now configured with PEM certificates instead of P12/JKS for keystore and truststore.
* Configuring `connector.plugin.version` under `spec.config` in the `KafkaConnector` custom resource, and under `spec.mirrors[].sourceConnector.config`, `spec.mirrors[].checkpointConnector.config`, and `spec.mirrors[].heartbeatConnector.config` in the `KafkaMirrorMaker2` custom resource is deprecated and will be forbidden in Strimzi 0.50.0.
Instead, please use `spec.version` in the `KafkaConnecter` custom resource, and the connector specific `version` fields in the `KafkaMirrorMaker2` custom resource.

## 0.48.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"pause", "tasksMax", "config", "state", "listOffsets", "alterOffsets"})
@JsonPropertyOrder({"pause", "tasksMax", "version", "config", "state", "listOffsets", "alterOffsets"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public abstract class AbstractConnectorSpec extends Spec {
Expand All @@ -40,6 +40,7 @@ public abstract class AbstractConnectorSpec extends Spec {

private Integer tasksMax;
private Boolean pause;
private String version;
private Map<String, Object> config = new HashMap<>(0);
private ConnectorState state;

Expand All @@ -66,6 +67,24 @@ public void setTasksMax(Integer tasksMax) {
this.tasksMax = tasksMax;
}

/**
* @return Version or version range for the Kafka Connector
*/
@Description("Desired version or version range to respect when starting the Kafka Connector. This is only supported when using Kafka Connect version 4.1.0 and higher.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
public String getVersion() {
return version;
}

/**
* Sets the plugin version string for the Kafka Connector
*
* @param version Version or version range
*/
public void setVersion(String version) {
this.version = version;
}

/**
* @return Connector configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "config", "pause", "state", "listOffsets", "alterOffsets"})
@JsonPropertyOrder({"class", "tasksMax", "autoRestart", "version", "config", "pause", "state", "listOffsets", "alterOffsets"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaConnectorSpec extends AbstractConnectorSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
builderPackage = Constants.FABRIC8_KUBERNETES_API
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({"tasksMax", "pause", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
@JsonPropertyOrder({"tasksMax", "pause", "version", "config", "state", "autoRestart", "listOffsets", "alterOffsets"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaMirrorMaker2ConnectorSpec extends AbstractConnectorSpec { }
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public List<KafkaConnector> generateConnectorDefinitions() {
.withState(mm2ConnectorSpec.getState())
.withAutoRestart(mm2ConnectorSpec.getAutoRestart())
.withTasksMax(mm2ConnectorSpec.getTasksMax())
.withVersion(mm2ConnectorSpec.getVersion())
.withListOffsets(mm2ConnectorSpec.getListOffsets())
.withAlterOffsets(mm2ConnectorSpec.getAlterOffsets())
.endSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,22 @@ protected Future<Void> connectPodDisruptionBudget(Reconciliation reconciliation,
protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reconciliation reconciliation, String host, KafkaConnectApi apiClient,
String connectorName, KafkaConnectorSpec connectorSpec, CustomResource resource) {
KafkaConnectorConfiguration desiredConfig = new KafkaConnectorConfiguration(reconciliation, connectorSpec.getConfig().entrySet());
// In Strimzi 0.50.0 connector.plugin.version will be added to forbidden list, for now add warning to conditions if specified
// Work for removing this is tracked in https://github.com/strimzi/strimzi-kafka-operator/issues/12027
List<Condition> initialConditions = new ArrayList<>();
if (desiredConfig.getConfigOption("connector.plugin.version") != null) {
String message = "Config option connector.plugin.version has been set under the config field. This is deprecated and will be forbidden in future. " +
"Use version field instead.";
LOGGER.warnCr(reconciliation, message);
initialConditions.add(StatusUtils.buildWarningCondition("DeprecatedFields", message));
}

return VertxUtil.completableFutureToVertxFuture(apiClient.getConnectorConfig(reconciliation, new BackOff(200L, 2, 6), host, port, connectorName)).compose(
currentConfig -> {
if (!needsReconfiguring(reconciliation, connectorName, connectorSpec, desiredConfig.asOrderedProperties().asMap(), currentConfig)) {
LOGGER.debugCr(reconciliation, "Connector {} exists and has desired config, {}=={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
return VertxUtil.completableFutureToVertxFuture(apiClient.status(reconciliation, host, port, connectorName))
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, new ArrayList<>()))
.compose(status -> updateState(reconciliation, host, apiClient, connectorName, connectorSpec, status, initialConditions))
.compose(conditions -> manageConnectorOffsets(reconciliation, host, apiClient, connectorName, resource, connectorSpec, conditions))
.compose(conditions -> maybeRestartConnector(reconciliation, host, apiClient, connectorName, resource, conditions))
.compose(conditions -> maybeRestartConnectorTask(reconciliation, host, apiClient, connectorName, resource, conditions))
Expand All @@ -452,7 +461,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
} else {
LOGGER.debugCr(reconciliation, "Connector {} exists but does not have desired config, {}!={}", connectorName, desiredConfig.asOrderedProperties().asMap(), currentConfig);
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
.compose(createConnectorStatusAndConditions())
.compose(createConnectorStatusAndConditions(initialConditions))
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
}
},
Expand All @@ -461,7 +470,7 @@ protected Future<ConnectorStatusAndConditions> maybeCreateOrUpdateConnector(Reco
&& ((ConnectRestException) error).getStatusCode() == 404) {
LOGGER.debugCr(reconciliation, "Connector {} does not exist", connectorName);
return createOrUpdateConnector(reconciliation, host, apiClient, connectorName, connectorSpec, desiredConfig)
.compose(createConnectorStatusAndConditions())
.compose(createConnectorStatusAndConditions(initialConditions))
.compose(status -> autoRestartFailedConnectorAndTasks(reconciliation, host, apiClient, connectorName, connectorSpec, status, resource))
.compose(status -> updateConnectorTopics(reconciliation, host, apiClient, connectorName, status));
} else {
Expand All @@ -474,11 +483,14 @@ private boolean needsReconfiguring(Reconciliation reconciliation, String connect
KafkaConnectorSpec connectorSpec,
Map<String, String> desiredConfig,
Map<String, String> actualConfig) {
// The actual which comes from Connect API includes tasks.max, connector.class and name,
// The actual which comes from Connect API includes tasks.max, connector.class, connector.plugin.version (if set) and name,
// which connectorSpec.getConfig() does not
if (connectorSpec.getTasksMax() != null) {
desiredConfig.put("tasks.max", connectorSpec.getTasksMax().toString());
}
if (connectorSpec.getVersion() != null) {
desiredConfig.put("connector.plugin.version", connectorSpec.getVersion());
}
desiredConfig.put("name", connectorName);
desiredConfig.put("connector.class", connectorSpec.getClassName());

Expand Down Expand Up @@ -1178,6 +1190,10 @@ private static JsonObject asJson(KafkaConnectorSpec spec, KafkaConnectorConfigur
connectorConfigJson.put("tasks.max", spec.getTasksMax());
}

if (spec.getVersion() != null) {
connectorConfigJson.put("connector.plugin.version", spec.getVersion());
}

return connectorConfigJson.put("connector.class", spec.getClassName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.strimzi.api.kafka.model.common.CertSecretSource;
import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.connector.AutoRestartStatus;
import io.strimzi.api.kafka.model.connector.KafkaConnector;
import io.strimzi.api.kafka.model.connector.KafkaConnectorSpec;
Expand Down Expand Up @@ -147,8 +148,12 @@ protected Future<KafkaMirrorMaker2Status> createOrUpdate(Reconciliation reconcil
.compose(i -> hasZeroReplicas ? Future.succeededFuture() : reconcileConnectors(reconciliation, kafkaMirrorMaker2, mirrorMaker2Cluster, kafkaMirrorMaker2Status))
.map((Void) null)
.onComplete(reconciliationResult -> {
// Extract warning conditions from reconciliation
List<Condition> warningConditions = kafkaMirrorMaker2Status.getConditions();
StatusUtils.setStatusConditionAndObservedGeneration(kafkaMirrorMaker2, kafkaMirrorMaker2Status, reconciliationResult.cause());
Copy link
Member Author

@katheris katheris Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the call to setStatusConditionAndObservedGeneration overwrites any of the conditions set in the status previously. Since in a future release the connector.plugin.version will be added to the forbidden list I decided not to add an additional method to StatusUtils to add the status condition and observed generation. Instead storing and adding the warning afterwards means we can more easily remove this new code when it is no longer needed.

We could of course change this behaviour to properly handle any status that has been set, but that is out of scope for this particular issue.


if (warningConditions != null && !warningConditions.isEmpty()) {
kafkaMirrorMaker2Status.addConditions(warningConditions);
}
if (!hasZeroReplicas) {
kafkaMirrorMaker2Status.setUrl(KafkaMirrorMaker2Resources.url(mirrorMaker2Cluster.getCluster(), namespace, KafkaMirrorMaker2Cluster.REST_API_PORT));
}
Expand Down
Loading
Loading