diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java
new file mode 100644
index 0000000000..c9a026cd53
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java
@@ -0,0 +1,32 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.util.function.BiPredicate;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+public interface Expectation
{
+
+ String UNNAMED = "unnamed";
+
+ boolean isFulfilled(P primary, Context
context);
+
+ default String name() {
+ return UNNAMED;
+ }
+
+ static
Expectation
createExpectation(
+ String name, BiPredicate
> predicate) {
+ return new Expectation<>() {
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public boolean isFulfilled(P primary, Context
context) {
+ return predicate.test(primary, context);
+ }
+ };
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java
new file mode 100644
index 0000000000..f9f39d64d3
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java
@@ -0,0 +1,63 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+public class ExpectationManager
{
+
+ protected final ConcurrentHashMap> registeredExpectations =
+ new ConcurrentHashMap<>();
+
+ public void setExpectation(P primary, Expectation expectation, Duration timeout) {
+ registeredExpectations.put(
+ ResourceID.fromResource(primary),
+ new RegisteredExpectation<>(LocalDateTime.now(), timeout, expectation));
+ }
+
+ /**
+ * Checks if provided expectation is fulfilled. Return the expectation result. If the result of
+ * expectation is fulfilled or timed out, the expectation is automatically removed;
+ */
+ public Optional> checkOnExpectation(P primary, Context context) {
+ var resourceID = ResourceID.fromResource(primary);
+ var regExp = registeredExpectations.get(ResourceID.fromResource(primary));
+ if (regExp == null) {
+ return Optional.empty();
+ }
+ if (regExp.expectation().isFulfilled(primary, context)) {
+ registeredExpectations.remove(resourceID);
+ return Optional.of(
+ new ExpectationResult<>(regExp.expectation(), ExpectationStatus.FULFILLED));
+ } else if (regExp.isTimedOut()) {
+ registeredExpectations.remove(resourceID);
+ return Optional.of(
+ new ExpectationResult<>(regExp.expectation(), ExpectationStatus.TIMED_OUT));
+ } else {
+ return Optional.of(
+ new ExpectationResult<>(regExp.expectation(), ExpectationStatus.NOT_FULFILLED));
+ }
+ }
+
+ public boolean isExpectationPresent(P primary) {
+ return registeredExpectations.containsKey(ResourceID.fromResource(primary));
+ }
+
+ public Optional> getExpectation(P primary) {
+ var regExp = registeredExpectations.get(ResourceID.fromResource(primary));
+ return Optional.ofNullable(regExp).map(RegisteredExpectation::expectation);
+ }
+
+ public Optional getExpectationName(P primary) {
+ return getExpectation(primary).map(Expectation::name);
+ }
+
+ public void cleanup(P primary) {
+ registeredExpectations.remove(ResourceID.fromResource(primary));
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java
new file mode 100644
index 0000000000..408050421a
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java
@@ -0,0 +1,19 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+
+public record ExpectationResult(
+ Expectation
expectation, ExpectationStatus status) {
+
+ public boolean isFulfilled() {
+ return status == ExpectationStatus.FULFILLED;
+ }
+
+ public boolean isTimedOut() {
+ return status == ExpectationStatus.TIMED_OUT;
+ }
+
+ public String name() {
+ return expectation.name();
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java
new file mode 100644
index 0000000000..55ee791b9d
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java
@@ -0,0 +1,7 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+public enum ExpectationStatus {
+ FULFILLED,
+ NOT_FULFILLED,
+ TIMED_OUT
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java
new file mode 100644
index 0000000000..5478141e22
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java
@@ -0,0 +1,64 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache;
+
+public class PeriodicCleanerExpectationManager
+ extends ExpectationManager
{
+
+ private final ScheduledExecutorService scheduler =
+ Executors.newScheduledThreadPool(
+ 1,
+ r -> {
+ Thread thread = Executors.defaultThreadFactory().newThread(r);
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ private final Duration cleanupDelayAfterExpiration;
+ private final IndexedResourceCache
primaryCache;
+
+ public PeriodicCleanerExpectationManager(Duration period, Duration cleanupDelayAfterExpiration) {
+ this(period, cleanupDelayAfterExpiration, null);
+ }
+
+ public PeriodicCleanerExpectationManager(Duration period, IndexedResourceCache
primaryCache) {
+ this(period, null, primaryCache);
+ }
+
+ private PeriodicCleanerExpectationManager(
+ Duration period, Duration cleanupDelayAfterExpiration, IndexedResourceCache
primaryCache) {
+ this.cleanupDelayAfterExpiration = cleanupDelayAfterExpiration;
+ this.primaryCache = primaryCache;
+ scheduler.scheduleWithFixedDelay(
+ this::clean, period.toMillis(), period.toMillis(), TimeUnit.MICROSECONDS);
+ }
+
+ public void clean() {
+ registeredExpectations
+ .entrySet()
+ .removeIf(
+ e -> {
+ if (cleanupDelayAfterExpiration != null) {
+ return LocalDateTime.now()
+ .isAfter(
+ e.getValue()
+ .registeredAt()
+ .plus(e.getValue().timeout())
+ .plus(cleanupDelayAfterExpiration));
+ } else {
+ return primaryCache.get(e.getKey()).isEmpty();
+ }
+ });
+ }
+
+ void stop() {
+ scheduler.shutdownNow();
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java
new file mode 100644
index 0000000000..fe24f6dd25
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java
@@ -0,0 +1,14 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+
+record RegisteredExpectation
(
+ LocalDateTime registeredAt, Duration timeout, Expectation
expectation) {
+
+ public boolean isTimedOut() {
+ return LocalDateTime.now().isAfter(registeredAt.plus(timeout));
+ }
+}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java
new file mode 100644
index 0000000000..399ea2652f
--- /dev/null
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java
@@ -0,0 +1,158 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ExpectationManagerTest {
+
+ private ExpectationManager expectationManager;
+ private ConfigMap configMap;
+ private Context context;
+
+ @BeforeEach
+ void setUp() {
+ expectationManager = new ExpectationManager<>();
+ configMap = new ConfigMap();
+ configMap.setMetadata(
+ new ObjectMetaBuilder().withName("test-configmap").withNamespace("test-namespace").build());
+ context = mock(Context.class);
+ }
+
+ @Test
+ void setExpectationShouldStoreExpectation() {
+ Expectation expectation = mock(Expectation.class);
+ Duration timeout = Duration.ofMinutes(5);
+
+ expectationManager.setExpectation(configMap, expectation, timeout);
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ assertThat(expectationManager.getExpectation(configMap)).contains(expectation);
+ }
+
+ @Test
+ void checkOnExpectationShouldReturnEmptyWhenNoExpectation() {
+ Optional> result =
+ expectationManager.checkOnExpectation(configMap, context);
+
+ assertThat(result).isEmpty();
+ }
+
+ @Test
+ void checkOnExpectationShouldReturnFulfilledWhenExpectationMet() {
+ Expectation expectation = mock(Expectation.class);
+ when(expectation.isFulfilled(configMap, context)).thenReturn(true);
+
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
+ Optional> result =
+ expectationManager.checkOnExpectation(configMap, context);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().status()).isEqualTo(ExpectationStatus.FULFILLED);
+ assertThat(result.get().expectation()).isEqualTo(expectation);
+ assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
+ }
+
+ @Test
+ void checkOnExpectationShouldReturnNotFulfilledWhenExpectationNotMet() {
+ Expectation expectation = mock(Expectation.class);
+ when(expectation.isFulfilled(configMap, context)).thenReturn(false);
+
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
+ Optional> result =
+ expectationManager.checkOnExpectation(configMap, context);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().status()).isEqualTo(ExpectationStatus.NOT_FULFILLED);
+ assertThat(result.get().expectation()).isEqualTo(expectation);
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ }
+
+ @Test
+ void checkOnExpectationShouldReturnTimedOutWhenExpectationExpired() throws InterruptedException {
+ Expectation expectation = mock(Expectation.class);
+ when(expectation.isFulfilled(configMap, context)).thenReturn(false);
+
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
+ Thread.sleep(10);
+ Optional> result =
+ expectationManager.checkOnExpectation(configMap, context);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().status()).isEqualTo(ExpectationStatus.TIMED_OUT);
+ assertThat(result.get().expectation()).isEqualTo(expectation);
+ assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
+ }
+
+ @Test
+ void getExpectationNameShouldReturnExpectationName() {
+ String expectedName = "test-expectation";
+ Expectation expectation = mock(Expectation.class);
+ when(expectation.name()).thenReturn(expectedName);
+
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
+ Optional name = expectationManager.getExpectationName(configMap);
+
+ assertThat(name).contains(expectedName);
+ }
+
+ @Test
+ void getExpectationNameShouldReturnEmptyWhenNoExpectation() {
+ Optional name = expectationManager.getExpectationName(configMap);
+
+ assertThat(name).isEmpty();
+ }
+
+ @Test
+ void cleanupShouldRemoveExpectation() {
+ Expectation expectation = mock(Expectation.class);
+
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ expectationManager.cleanup(configMap);
+ assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
+ }
+
+ @Test
+ void shouldHandleMultipleExpectationsForDifferentResources() {
+ ConfigMap configMap2 = new ConfigMap();
+ configMap2.setMetadata(
+ new ObjectMetaBuilder()
+ .withName("test-configmap-2")
+ .withNamespace("test-namespace")
+ .build());
+
+ Expectation expectation1 = mock(Expectation.class);
+ Expectation expectation2 = mock(Expectation.class);
+
+ expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5));
+ expectationManager.setExpectation(configMap2, expectation2, Duration.ofMinutes(5));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ assertThat(expectationManager.isExpectationPresent(configMap2)).isTrue();
+ assertThat(expectationManager.getExpectation(configMap)).contains(expectation1);
+ assertThat(expectationManager.getExpectation(configMap2)).contains(expectation2);
+ }
+
+ @Test
+ void setExpectationShouldReplaceExistingExpectation() {
+ Expectation expectation1 = mock(Expectation.class);
+ Expectation expectation2 = mock(Expectation.class);
+
+ expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5));
+ expectationManager.setExpectation(configMap, expectation2, Duration.ofMinutes(5));
+
+ assertThat(expectationManager.getExpectation(configMap)).contains(expectation2);
+ }
+}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java
new file mode 100644
index 0000000000..0bba070955
--- /dev/null
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java
@@ -0,0 +1,149 @@
+package io.javaoperatorsdk.operator.processing.expectation;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.when;
+
+class PeriodicCleanerExpectationManagerTest {
+
+ @Mock private IndexedResourceCache primaryCache;
+
+ private PeriodicCleanerExpectationManager expectationManager;
+ private ConfigMap configMap;
+ private AutoCloseable closeable;
+
+ @BeforeEach
+ void setUp() {
+ closeable = MockitoAnnotations.openMocks(this);
+ configMap = new ConfigMap();
+ configMap.setMetadata(
+ new ObjectMetaBuilder().withName("test-configmap").withNamespace("test-namespace").build());
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (expectationManager != null) {
+ expectationManager.stop();
+ }
+ closeable.close();
+ }
+
+ @Test
+ void shouldCleanExpiredExpectationsWithCleanupDelay() {
+ Duration period = Duration.ofMillis(50);
+ Duration cleanupDelay = Duration.ofMillis(10);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay);
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ await()
+ .atMost(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> assertThat(expectationManager.isExpectationPresent(configMap)).isFalse());
+ }
+
+ @Test
+ void shouldCleanExpectationsWhenResourceNotInCache() {
+ Duration period = Duration.ofMillis(50);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, primaryCache);
+
+ ResourceID resourceId = ResourceID.fromResource(configMap);
+ when(primaryCache.get(resourceId)).thenReturn(java.util.Optional.empty());
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(10));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ await()
+ .atMost(200, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> assertThat(expectationManager.isExpectationPresent(configMap)).isFalse());
+ }
+
+ @Test
+ void shouldNotCleanExpectationsWhenResourceInCache() throws InterruptedException {
+ Duration period = Duration.ofMillis(50);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, primaryCache);
+
+ ResourceID resourceId = ResourceID.fromResource(configMap);
+ when(primaryCache.get(resourceId)).thenReturn(java.util.Optional.of(configMap));
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(10));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ Thread.sleep(150);
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ }
+
+ @Test
+ void shouldNotCleanNonExpiredExpectationsWithCleanupDelay() throws InterruptedException {
+ Duration period = Duration.ofMillis(50);
+ Duration cleanupDelay = Duration.ofMinutes(1);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay);
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ Thread.sleep(150);
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ }
+
+ @Test
+ void stopShouldShutdownScheduler() {
+ Duration period = Duration.ofMillis(50);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, Duration.ofMillis(10));
+
+ expectationManager.stop();
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+ }
+
+ @Test
+ void cleanShouldWorkDirectly() {
+ Duration period = Duration.ofMinutes(10);
+ Duration cleanupDelay = Duration.ofMillis(1);
+ expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay);
+
+ Expectation expectation = (primary, context) -> false;
+ expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
+
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
+
+ expectationManager.clean();
+
+ assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
+ }
+}
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java
index 1855f89b77..7ac495aeb2 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java
@@ -38,7 +38,6 @@ public JobReconciler(boolean addPrimaryToSecondaryMapper) {
@Override
public UpdateControl reconcile(Job resource, Context context) {
-
if (!getResourceDirectlyFromCache) {
// this is only possible when there is primary to secondary mapper
context