diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java new file mode 100644 index 0000000000..c9a026cd53 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/Expectation.java @@ -0,0 +1,32 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +public interface Expectation

{ + + String UNNAMED = "unnamed"; + + boolean isFulfilled(P primary, Context

context); + + default String name() { + return UNNAMED; + } + + static

Expectation

createExpectation( + String name, BiPredicate> predicate) { + return new Expectation<>() { + @Override + public String name() { + return name; + } + + @Override + public boolean isFulfilled(P primary, Context

context) { + return predicate.test(primary, context); + } + }; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java new file mode 100644 index 0000000000..f9f39d64d3 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManager.java @@ -0,0 +1,63 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class ExpectationManager

{ + + protected final ConcurrentHashMap> registeredExpectations = + new ConcurrentHashMap<>(); + + public void setExpectation(P primary, Expectation

expectation, Duration timeout) { + registeredExpectations.put( + ResourceID.fromResource(primary), + new RegisteredExpectation<>(LocalDateTime.now(), timeout, expectation)); + } + + /** + * Checks if provided expectation is fulfilled. Return the expectation result. If the result of + * expectation is fulfilled or timed out, the expectation is automatically removed; + */ + public Optional> checkOnExpectation(P primary, Context

context) { + var resourceID = ResourceID.fromResource(primary); + var regExp = registeredExpectations.get(ResourceID.fromResource(primary)); + if (regExp == null) { + return Optional.empty(); + } + if (regExp.expectation().isFulfilled(primary, context)) { + registeredExpectations.remove(resourceID); + return Optional.of( + new ExpectationResult<>(regExp.expectation(), ExpectationStatus.FULFILLED)); + } else if (regExp.isTimedOut()) { + registeredExpectations.remove(resourceID); + return Optional.of( + new ExpectationResult<>(regExp.expectation(), ExpectationStatus.TIMED_OUT)); + } else { + return Optional.of( + new ExpectationResult<>(regExp.expectation(), ExpectationStatus.NOT_FULFILLED)); + } + } + + public boolean isExpectationPresent(P primary) { + return registeredExpectations.containsKey(ResourceID.fromResource(primary)); + } + + public Optional> getExpectation(P primary) { + var regExp = registeredExpectations.get(ResourceID.fromResource(primary)); + return Optional.ofNullable(regExp).map(RegisteredExpectation::expectation); + } + + public Optional getExpectationName(P primary) { + return getExpectation(primary).map(Expectation::name); + } + + public void cleanup(P primary) { + registeredExpectations.remove(ResourceID.fromResource(primary)); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java new file mode 100644 index 0000000000..408050421a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationResult.java @@ -0,0 +1,19 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public record ExpectationResult

( + Expectation

expectation, ExpectationStatus status) { + + public boolean isFulfilled() { + return status == ExpectationStatus.FULFILLED; + } + + public boolean isTimedOut() { + return status == ExpectationStatus.TIMED_OUT; + } + + public String name() { + return expectation.name(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java new file mode 100644 index 0000000000..55ee791b9d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +public enum ExpectationStatus { + FULFILLED, + NOT_FULFILLED, + TIMED_OUT +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java new file mode 100644 index 0000000000..5478141e22 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManager.java @@ -0,0 +1,64 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache; + +public class PeriodicCleanerExpectationManager

+ extends ExpectationManager

{ + + private final ScheduledExecutorService scheduler = + Executors.newScheduledThreadPool( + 1, + r -> { + Thread thread = Executors.defaultThreadFactory().newThread(r); + thread.setDaemon(true); + return thread; + }); + + private final Duration cleanupDelayAfterExpiration; + private final IndexedResourceCache

primaryCache; + + public PeriodicCleanerExpectationManager(Duration period, Duration cleanupDelayAfterExpiration) { + this(period, cleanupDelayAfterExpiration, null); + } + + public PeriodicCleanerExpectationManager(Duration period, IndexedResourceCache

primaryCache) { + this(period, null, primaryCache); + } + + private PeriodicCleanerExpectationManager( + Duration period, Duration cleanupDelayAfterExpiration, IndexedResourceCache

primaryCache) { + this.cleanupDelayAfterExpiration = cleanupDelayAfterExpiration; + this.primaryCache = primaryCache; + scheduler.scheduleWithFixedDelay( + this::clean, period.toMillis(), period.toMillis(), TimeUnit.MICROSECONDS); + } + + public void clean() { + registeredExpectations + .entrySet() + .removeIf( + e -> { + if (cleanupDelayAfterExpiration != null) { + return LocalDateTime.now() + .isAfter( + e.getValue() + .registeredAt() + .plus(e.getValue().timeout()) + .plus(cleanupDelayAfterExpiration)); + } else { + return primaryCache.get(e.getKey()).isEmpty(); + } + }); + } + + void stop() { + scheduler.shutdownNow(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java new file mode 100644 index 0000000000..fe24f6dd25 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/expectation/RegisteredExpectation.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.time.Duration; +import java.time.LocalDateTime; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +record RegisteredExpectation

( + LocalDateTime registeredAt, Duration timeout, Expectation

expectation) { + + public boolean isTimedOut() { + return LocalDateTime.now().isAfter(registeredAt.plus(timeout)); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java new file mode 100644 index 0000000000..399ea2652f --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/ExpectationManagerTest.java @@ -0,0 +1,158 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.time.Duration; +import java.util.Optional; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class ExpectationManagerTest { + + private ExpectationManager expectationManager; + private ConfigMap configMap; + private Context context; + + @BeforeEach + void setUp() { + expectationManager = new ExpectationManager<>(); + configMap = new ConfigMap(); + configMap.setMetadata( + new ObjectMetaBuilder().withName("test-configmap").withNamespace("test-namespace").build()); + context = mock(Context.class); + } + + @Test + void setExpectationShouldStoreExpectation() { + Expectation expectation = mock(Expectation.class); + Duration timeout = Duration.ofMinutes(5); + + expectationManager.setExpectation(configMap, expectation, timeout); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + assertThat(expectationManager.getExpectation(configMap)).contains(expectation); + } + + @Test + void checkOnExpectationShouldReturnEmptyWhenNoExpectation() { + Optional> result = + expectationManager.checkOnExpectation(configMap, context); + + assertThat(result).isEmpty(); + } + + @Test + void checkOnExpectationShouldReturnFulfilledWhenExpectationMet() { + Expectation expectation = mock(Expectation.class); + when(expectation.isFulfilled(configMap, context)).thenReturn(true); + + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5)); + Optional> result = + expectationManager.checkOnExpectation(configMap, context); + + assertThat(result).isPresent(); + assertThat(result.get().status()).isEqualTo(ExpectationStatus.FULFILLED); + assertThat(result.get().expectation()).isEqualTo(expectation); + assertThat(expectationManager.isExpectationPresent(configMap)).isFalse(); + } + + @Test + void checkOnExpectationShouldReturnNotFulfilledWhenExpectationNotMet() { + Expectation expectation = mock(Expectation.class); + when(expectation.isFulfilled(configMap, context)).thenReturn(false); + + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5)); + Optional> result = + expectationManager.checkOnExpectation(configMap, context); + + assertThat(result).isPresent(); + assertThat(result.get().status()).isEqualTo(ExpectationStatus.NOT_FULFILLED); + assertThat(result.get().expectation()).isEqualTo(expectation); + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + } + + @Test + void checkOnExpectationShouldReturnTimedOutWhenExpectationExpired() throws InterruptedException { + Expectation expectation = mock(Expectation.class); + when(expectation.isFulfilled(configMap, context)).thenReturn(false); + + expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1)); + Thread.sleep(10); + Optional> result = + expectationManager.checkOnExpectation(configMap, context); + + assertThat(result).isPresent(); + assertThat(result.get().status()).isEqualTo(ExpectationStatus.TIMED_OUT); + assertThat(result.get().expectation()).isEqualTo(expectation); + assertThat(expectationManager.isExpectationPresent(configMap)).isFalse(); + } + + @Test + void getExpectationNameShouldReturnExpectationName() { + String expectedName = "test-expectation"; + Expectation expectation = mock(Expectation.class); + when(expectation.name()).thenReturn(expectedName); + + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5)); + Optional name = expectationManager.getExpectationName(configMap); + + assertThat(name).contains(expectedName); + } + + @Test + void getExpectationNameShouldReturnEmptyWhenNoExpectation() { + Optional name = expectationManager.getExpectationName(configMap); + + assertThat(name).isEmpty(); + } + + @Test + void cleanupShouldRemoveExpectation() { + Expectation expectation = mock(Expectation.class); + + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5)); + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + expectationManager.cleanup(configMap); + assertThat(expectationManager.isExpectationPresent(configMap)).isFalse(); + } + + @Test + void shouldHandleMultipleExpectationsForDifferentResources() { + ConfigMap configMap2 = new ConfigMap(); + configMap2.setMetadata( + new ObjectMetaBuilder() + .withName("test-configmap-2") + .withNamespace("test-namespace") + .build()); + + Expectation expectation1 = mock(Expectation.class); + Expectation expectation2 = mock(Expectation.class); + + expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5)); + expectationManager.setExpectation(configMap2, expectation2, Duration.ofMinutes(5)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + assertThat(expectationManager.isExpectationPresent(configMap2)).isTrue(); + assertThat(expectationManager.getExpectation(configMap)).contains(expectation1); + assertThat(expectationManager.getExpectation(configMap2)).contains(expectation2); + } + + @Test + void setExpectationShouldReplaceExistingExpectation() { + Expectation expectation1 = mock(Expectation.class); + Expectation expectation2 = mock(Expectation.class); + + expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5)); + expectationManager.setExpectation(configMap, expectation2, Duration.ofMinutes(5)); + + assertThat(expectationManager.getExpectation(configMap)).contains(expectation2); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java new file mode 100644 index 0000000000..0bba070955 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/expectation/PeriodicCleanerExpectationManagerTest.java @@ -0,0 +1,149 @@ +package io.javaoperatorsdk.operator.processing.expectation; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.when; + +class PeriodicCleanerExpectationManagerTest { + + @Mock private IndexedResourceCache primaryCache; + + private PeriodicCleanerExpectationManager expectationManager; + private ConfigMap configMap; + private AutoCloseable closeable; + + @BeforeEach + void setUp() { + closeable = MockitoAnnotations.openMocks(this); + configMap = new ConfigMap(); + configMap.setMetadata( + new ObjectMetaBuilder().withName("test-configmap").withNamespace("test-namespace").build()); + } + + @AfterEach + void tearDown() throws Exception { + if (expectationManager != null) { + expectationManager.stop(); + } + closeable.close(); + } + + @Test + void shouldCleanExpiredExpectationsWithCleanupDelay() { + Duration period = Duration.ofMillis(50); + Duration cleanupDelay = Duration.ofMillis(10); + expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + await() + .atMost(200, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> assertThat(expectationManager.isExpectationPresent(configMap)).isFalse()); + } + + @Test + void shouldCleanExpectationsWhenResourceNotInCache() { + Duration period = Duration.ofMillis(50); + expectationManager = new PeriodicCleanerExpectationManager<>(period, primaryCache); + + ResourceID resourceId = ResourceID.fromResource(configMap); + when(primaryCache.get(resourceId)).thenReturn(java.util.Optional.empty()); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(10)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + await() + .atMost(200, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> assertThat(expectationManager.isExpectationPresent(configMap)).isFalse()); + } + + @Test + void shouldNotCleanExpectationsWhenResourceInCache() throws InterruptedException { + Duration period = Duration.ofMillis(50); + expectationManager = new PeriodicCleanerExpectationManager<>(period, primaryCache); + + ResourceID resourceId = ResourceID.fromResource(configMap); + when(primaryCache.get(resourceId)).thenReturn(java.util.Optional.of(configMap)); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(10)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + Thread.sleep(150); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + } + + @Test + void shouldNotCleanNonExpiredExpectationsWithCleanupDelay() throws InterruptedException { + Duration period = Duration.ofMillis(50); + Duration cleanupDelay = Duration.ofMinutes(1); + expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + Thread.sleep(150); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + } + + @Test + void stopShouldShutdownScheduler() { + Duration period = Duration.ofMillis(50); + expectationManager = new PeriodicCleanerExpectationManager<>(period, Duration.ofMillis(10)); + + expectationManager.stop(); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1)); + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + } + + @Test + void cleanShouldWorkDirectly() { + Duration period = Duration.ofMinutes(10); + Duration cleanupDelay = Duration.ofMillis(1); + expectationManager = new PeriodicCleanerExpectationManager<>(period, cleanupDelay); + + Expectation expectation = (primary, context) -> false; + expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1)); + + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + assertThat(expectationManager.isExpectationPresent(configMap)).isTrue(); + + expectationManager.clean(); + + assertThat(expectationManager.isExpectationPresent(configMap)).isFalse(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java index 1855f89b77..7ac495aeb2 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/primarytosecondary/JobReconciler.java @@ -38,7 +38,6 @@ public JobReconciler(boolean addPrimaryToSecondaryMapper) { @Override public UpdateControl reconcile(Job resource, Context context) { - if (!getResourceDirectlyFromCache) { // this is only possible when there is primary to secondary mapper context