Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.processing.expectation;

import java.util.function.BiPredicate;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;

public interface Expectation<P extends HasMetadata> {

String UNNAMED = "unnamed";

boolean isFulfilled(P primary, Context<P> context);

default String name() {
return UNNAMED;
}

static <P extends HasMetadata> Expectation<P> createExpectation(
String name, BiPredicate<P, Context<P>> predicate) {
return new Expectation<>() {
@Override
public String name() {
return name;
}

@Override
public boolean isFulfilled(P primary, Context<P> context) {
return predicate.test(primary, context);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package io.javaoperatorsdk.operator.processing.expectation;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class ExpectationManager<P extends HasMetadata> {

protected final ConcurrentHashMap<ResourceID, RegisteredExpectation<P>> registeredExpectations =
new ConcurrentHashMap<>();

public void setExpectation(P primary, Expectation<P> expectation, Duration timeout) {
registeredExpectations.put(
ResourceID.fromResource(primary),
new RegisteredExpectation<>(LocalDateTime.now(), timeout, expectation));
}

/**
* Checks if provided expectation is fulfilled. Return the expectation result. If the result of
* expectation is fulfilled or timed out, the expectation is automatically removed;
*/
public Optional<ExpectationResult<P>> checkOnExpectation(P primary, Context<P> context) {
var resourceID = ResourceID.fromResource(primary);
var regExp = registeredExpectations.get(ResourceID.fromResource(primary));
if (regExp == null) {
return Optional.empty();
}
if (regExp.expectation().isFulfilled(primary, context)) {
registeredExpectations.remove(resourceID);
return Optional.of(
new ExpectationResult<>(regExp.expectation(), ExpectationStatus.FULFILLED));
} else if (regExp.isTimedOut()) {
registeredExpectations.remove(resourceID);
return Optional.of(
new ExpectationResult<>(regExp.expectation(), ExpectationStatus.TIMED_OUT));
} else {
return Optional.of(
new ExpectationResult<>(regExp.expectation(), ExpectationStatus.NOT_FULFILLED));
}
}

public boolean isExpectationPresent(P primary) {
return registeredExpectations.containsKey(ResourceID.fromResource(primary));
}

public Optional<Expectation<P>> getExpectation(P primary) {
var regExp = registeredExpectations.get(ResourceID.fromResource(primary));
return Optional.ofNullable(regExp).map(RegisteredExpectation::expectation);
}

public Optional<String> getExpectationName(P primary) {
return getExpectation(primary).map(Expectation::name);
}

public void cleanup(P primary) {
registeredExpectations.remove(ResourceID.fromResource(primary));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.javaoperatorsdk.operator.processing.expectation;

import io.fabric8.kubernetes.api.model.HasMetadata;

public record ExpectationResult<P extends HasMetadata>(
Expectation<P> expectation, ExpectationStatus status) {

public boolean isFulfilled() {
return status == ExpectationStatus.FULFILLED;
}

public boolean isTimedOut() {
return status == ExpectationStatus.TIMED_OUT;
}

public String name() {
return expectation.name();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.javaoperatorsdk.operator.processing.expectation;

public enum ExpectationStatus {
FULFILLED,
NOT_FULFILLED,
TIMED_OUT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.javaoperatorsdk.operator.processing.expectation;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache;

public class PeriodicCleanerExpectationManager<P extends HasMetadata>
extends ExpectationManager<P> {

private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(
1,
r -> {
Thread thread = Executors.defaultThreadFactory().newThread(r);
thread.setDaemon(true);
return thread;
});

private final Duration cleanupDelayAfterExpiration;
private final IndexedResourceCache<P> primaryCache;

public PeriodicCleanerExpectationManager(Duration period, Duration cleanupDelayAfterExpiration) {
this(period, cleanupDelayAfterExpiration, null);
}

public PeriodicCleanerExpectationManager(Duration period, IndexedResourceCache<P> primaryCache) {
this(period, null, primaryCache);
}

private PeriodicCleanerExpectationManager(
Duration period, Duration cleanupDelayAfterExpiration, IndexedResourceCache<P> primaryCache) {
this.cleanupDelayAfterExpiration = cleanupDelayAfterExpiration;
this.primaryCache = primaryCache;
scheduler.scheduleWithFixedDelay(
this::clean, period.toMillis(), period.toMillis(), TimeUnit.MICROSECONDS);
}

public void clean() {
registeredExpectations
.entrySet()
.removeIf(
e -> {
if (cleanupDelayAfterExpiration != null) {
return LocalDateTime.now()
.isAfter(
e.getValue()
.registeredAt()
.plus(e.getValue().timeout())
.plus(cleanupDelayAfterExpiration));
} else {
return primaryCache.get(e.getKey()).isEmpty();
}
});
}

void stop() {
scheduler.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.processing.expectation;

import java.time.Duration;
import java.time.LocalDateTime;

import io.fabric8.kubernetes.api.model.HasMetadata;

record RegisteredExpectation<P extends HasMetadata>(
LocalDateTime registeredAt, Duration timeout, Expectation<P> expectation) {

public boolean isTimedOut() {
return LocalDateTime.now().isAfter(registeredAt.plus(timeout));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.javaoperatorsdk.operator.processing.expectation;

import java.time.Duration;
import java.util.Optional;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.api.reconciler.Context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class ExpectationManagerTest {

private ExpectationManager<ConfigMap> expectationManager;
private ConfigMap configMap;
private Context<ConfigMap> context;

@BeforeEach
void setUp() {
expectationManager = new ExpectationManager<>();
configMap = new ConfigMap();
configMap.setMetadata(
new ObjectMetaBuilder().withName("test-configmap").withNamespace("test-namespace").build());
context = mock(Context.class);
}

@Test
void setExpectationShouldStoreExpectation() {
Expectation<ConfigMap> expectation = mock(Expectation.class);
Duration timeout = Duration.ofMinutes(5);

expectationManager.setExpectation(configMap, expectation, timeout);

assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
assertThat(expectationManager.getExpectation(configMap)).contains(expectation);
}

@Test
void checkOnExpectationShouldReturnEmptyWhenNoExpectation() {
Optional<ExpectationResult<ConfigMap>> result =
expectationManager.checkOnExpectation(configMap, context);

assertThat(result).isEmpty();
}

@Test
void checkOnExpectationShouldReturnFulfilledWhenExpectationMet() {
Expectation<ConfigMap> expectation = mock(Expectation.class);
when(expectation.isFulfilled(configMap, context)).thenReturn(true);

expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
Optional<ExpectationResult<ConfigMap>> result =
expectationManager.checkOnExpectation(configMap, context);

assertThat(result).isPresent();
assertThat(result.get().status()).isEqualTo(ExpectationStatus.FULFILLED);
assertThat(result.get().expectation()).isEqualTo(expectation);
assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
}

@Test
void checkOnExpectationShouldReturnNotFulfilledWhenExpectationNotMet() {
Expectation<ConfigMap> expectation = mock(Expectation.class);
when(expectation.isFulfilled(configMap, context)).thenReturn(false);

expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
Optional<ExpectationResult<ConfigMap>> result =
expectationManager.checkOnExpectation(configMap, context);

assertThat(result).isPresent();
assertThat(result.get().status()).isEqualTo(ExpectationStatus.NOT_FULFILLED);
assertThat(result.get().expectation()).isEqualTo(expectation);
assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
}

@Test
void checkOnExpectationShouldReturnTimedOutWhenExpectationExpired() throws InterruptedException {
Expectation<ConfigMap> expectation = mock(Expectation.class);
when(expectation.isFulfilled(configMap, context)).thenReturn(false);

expectationManager.setExpectation(configMap, expectation, Duration.ofMillis(1));
Thread.sleep(10);
Optional<ExpectationResult<ConfigMap>> result =
expectationManager.checkOnExpectation(configMap, context);

assertThat(result).isPresent();
assertThat(result.get().status()).isEqualTo(ExpectationStatus.TIMED_OUT);
assertThat(result.get().expectation()).isEqualTo(expectation);
assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
}

@Test
void getExpectationNameShouldReturnExpectationName() {
String expectedName = "test-expectation";
Expectation<ConfigMap> expectation = mock(Expectation.class);
when(expectation.name()).thenReturn(expectedName);

expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
Optional<String> name = expectationManager.getExpectationName(configMap);

assertThat(name).contains(expectedName);
}

@Test
void getExpectationNameShouldReturnEmptyWhenNoExpectation() {
Optional<String> name = expectationManager.getExpectationName(configMap);

assertThat(name).isEmpty();
}

@Test
void cleanupShouldRemoveExpectation() {
Expectation<ConfigMap> expectation = mock(Expectation.class);

expectationManager.setExpectation(configMap, expectation, Duration.ofMinutes(5));
assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();

expectationManager.cleanup(configMap);
assertThat(expectationManager.isExpectationPresent(configMap)).isFalse();
}

@Test
void shouldHandleMultipleExpectationsForDifferentResources() {
ConfigMap configMap2 = new ConfigMap();
configMap2.setMetadata(
new ObjectMetaBuilder()
.withName("test-configmap-2")
.withNamespace("test-namespace")
.build());

Expectation<ConfigMap> expectation1 = mock(Expectation.class);
Expectation<ConfigMap> expectation2 = mock(Expectation.class);

expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5));
expectationManager.setExpectation(configMap2, expectation2, Duration.ofMinutes(5));

assertThat(expectationManager.isExpectationPresent(configMap)).isTrue();
assertThat(expectationManager.isExpectationPresent(configMap2)).isTrue();
assertThat(expectationManager.getExpectation(configMap)).contains(expectation1);
assertThat(expectationManager.getExpectation(configMap2)).contains(expectation2);
}

@Test
void setExpectationShouldReplaceExistingExpectation() {
Expectation<ConfigMap> expectation1 = mock(Expectation.class);
Expectation<ConfigMap> expectation2 = mock(Expectation.class);

expectationManager.setExpectation(configMap, expectation1, Duration.ofMinutes(5));
expectationManager.setExpectation(configMap, expectation2, Duration.ofMinutes(5));

assertThat(expectationManager.getExpectation(configMap)).contains(expectation2);
}
}
Loading