diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java index a4b336b8b94..542ca027c89 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java @@ -15,8 +15,10 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper; import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessorModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.OpenTelemetryConfigurationModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SamplerModel; +import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.io.Closeable; import java.io.IOException; @@ -210,6 +212,29 @@ public static Sampler createSampler(DeclarativeConfigProperties genericSamplerMo samplerModel); } + /** + * Create a {@link LogRecordProcessor} from the {@code logRecordProcessorModel} representing the + * log record processor config. + * + *

This is used when log record processors are composed, with one processor accepting one or + * more additional processors as config properties. The {@link ComponentProvider} implementation + * can call this to configure a delegate {@link LogRecordProcessor} from the {@link + * DeclarativeConfigProperties} corresponding to a particular config property. + */ + public static LogRecordProcessor createLogRecordProcessor( + DeclarativeConfigProperties genericLogRecordProcessorModel) { + YamlDeclarativeConfigProperties yamlDeclarativeConfigProperties = + requireYamlDeclarativeConfigProperties(genericLogRecordProcessorModel); + LogRecordProcessorModel logRecordProcessorModel = + MAPPER.convertValue( + DeclarativeConfigProperties.toMap(yamlDeclarativeConfigProperties), + LogRecordProcessorModel.class); + return createAndMaybeCleanup( + LogRecordProcessorFactory.getInstance(), + SpiHelper.create(yamlDeclarativeConfigProperties.getComponentLoader()), + logRecordProcessorModel); + } + private static YamlDeclarativeConfigProperties requireYamlDeclarativeConfigProperties( DeclarativeConfigProperties declarativeConfigProperties) { if (!(declarativeConfigProperties instanceof YamlDeclarativeConfigProperties)) { diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java new file mode 100644 index 00000000000..22f06fe495c --- /dev/null +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig.internal; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; + +/** + * ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class SeverityBasedLogRecordProcessorComponentProvider + implements ComponentProvider { + + @Override + public Class getType() { + return LogRecordProcessor.class; + } + + @Override + public String getName() { + return "severity_based"; + } + + @Override + public LogRecordProcessor create(DeclarativeConfigProperties config) { + String minimumSeverityStr = config.getString("minimum_severity"); + if (minimumSeverityStr == null) { + throw new IllegalArgumentException( + "minimum_severity is required for severity_based log processors"); + } + + Severity minimumSeverity; + try { + minimumSeverity = Severity.valueOf(minimumSeverityStr); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid severity value: " + minimumSeverityStr, e); + } + + DeclarativeConfigProperties delegateConfig = config.getStructured("delegate"); + if (delegateConfig == null) { + throw new IllegalArgumentException("delegate is required for severity_based log processors"); + } + + LogRecordProcessor delegate = DeclarativeConfiguration.createLogRecordProcessor(delegateConfig); + + return SeverityBasedLogRecordProcessor.builder(minimumSeverity, delegate).build(); + } +} diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java new file mode 100644 index 00000000000..5e8e5d5a4a0 --- /dev/null +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/TraceBasedLogRecordProcessorComponentProvider.java @@ -0,0 +1,44 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig.internal; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor; + +/** + * ComponentProvider for TraceBasedLogRecordProcessor to support declarative configuration. + * + *

This class is internal and is hence not for public use. Its APIs are unstable and can change + * at any time. + */ +public class TraceBasedLogRecordProcessorComponentProvider + implements ComponentProvider { + + @Override + public Class getType() { + return LogRecordProcessor.class; + } + + @Override + public String getName() { + return "trace_based"; + } + + @Override + public LogRecordProcessor create(DeclarativeConfigProperties config) { + DeclarativeConfigProperties delegateConfig = config.getStructured("delegate"); + if (delegateConfig == null) { + throw new IllegalArgumentException("delegate is required for trace_based log processors"); + } + + LogRecordProcessor delegate = DeclarativeConfiguration.createLogRecordProcessor(delegateConfig); + + return TraceBasedLogRecordProcessor.builder(delegate).build(); + } +} diff --git a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider index a1a361a5f37..7f54952442e 100644 --- a/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider +++ b/sdk-extensions/incubator/src/main/resources/META-INF/services/io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider @@ -1 +1,3 @@ io.opentelemetry.sdk.extension.incubator.fileconfig.ServiceResourceDetector +io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider +io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java new file mode 100644 index 00000000000..c1b72af89bf --- /dev/null +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/SeverityBasedLogRecordProcessorComponentProviderTest.java @@ -0,0 +1,133 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.SeverityBasedLogRecordProcessorComponentProvider; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class SeverityBasedLogRecordProcessorComponentProviderTest { + + @Test + void createSeverityBasedProcessor_DirectComponentProvider() { + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class); + assertThat(provider.getName()).isEqualTo("severity_based"); + } + + @Test + void createSeverityBasedProcessor_ValidConfig() { + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"WARN\"\n" + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class); + + assertThat(processor.toString()) + .contains("minimumSeverity=WARN") + .contains("delegate=SimpleLogRecordProcessor") + .contains("logRecordExporter=SystemOutLogRecordExporter"); + } + + @Test + void createSeverityBasedProcessor_MissingMinimumSeverity() { + DeclarativeConfigProperties config = + getConfig( + "delegate:\n" // this comment exists only to influence spotless formatting + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("minimum_severity is required for severity_based log processors"); + } + + @Test + void createSeverityBasedProcessor_InvalidSeverity() { + + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"INVALID\"\n" + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Invalid severity value: INVALID"); + } + + @Test + void createSeverityBasedProcessor_MissingDelegate() { + DeclarativeConfigProperties config = getConfig("minimum_severity: \"WARN\"\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("delegate is required for severity_based log processors"); + } + + @Test + void createSeverityBasedProcessor_SingleDelegate() { + DeclarativeConfigProperties config = + getConfig( + "minimum_severity: \"INFO\"\n" + + "delegate:\n" + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); + + SeverityBasedLogRecordProcessorComponentProvider provider = + new SeverityBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(SeverityBasedLogRecordProcessor.class); + assertThat(processor.toString()).contains("SeverityBasedLogRecordProcessor"); + } + + private static DeclarativeConfigProperties getConfig(String yaml) { + Object yamlObj = + DeclarativeConfiguration.loadYaml( + new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap()); + + return DeclarativeConfiguration.toConfigProperties( + yamlObj, + ComponentLoader.forClassLoader( + SeverityBasedLogRecordProcessorComponentProviderTest.class.getClassLoader())); + } +} diff --git a/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java new file mode 100644 index 00000000000..32094743bcc --- /dev/null +++ b/sdk-extensions/incubator/src/test/java/io/opentelemetry/sdk/extension/incubator/fileconfig/TraceBasedLogRecordProcessorComponentProviderTest.java @@ -0,0 +1,91 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.common.ComponentLoader; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.TraceBasedLogRecordProcessorComponentProvider; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.TraceBasedLogRecordProcessor; +import java.io.ByteArrayInputStream; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +class TraceBasedLogRecordProcessorComponentProviderTest { + + @Test + void createTraceBasedProcessor_DirectComponentProvider() { + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + assertThat(provider.getType()).isEqualTo(LogRecordProcessor.class); + assertThat(provider.getName()).isEqualTo("trace_based"); + } + + @Test + void createTraceBasedProcessor_ValidConfig() { + DeclarativeConfigProperties config = + getConfig( + "delegate:\n" // this comment exists only to influence spotless formatting + + " simple:\n" + + " exporter:\n" + + " console: {}\n"); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class); + + assertThat(processor.toString()) + .contains("TraceBasedLogRecordProcessor") + .contains("delegate=SimpleLogRecordProcessor") + .contains("logRecordExporter=SystemOutLogRecordExporter"); + } + + @Test + void createTraceBasedProcessor_MissingDelegate() { + DeclarativeConfigProperties config = getConfig(""); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + assertThatThrownBy(() -> provider.create(config)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("delegate is required for trace_based log processors"); + } + + @Test + void createTraceBasedProcessor_SingleDelegate() { + DeclarativeConfigProperties config = + getConfig("delegate:\n" + " simple:\n" + " exporter:\n" + " console: {}\n"); + + TraceBasedLogRecordProcessorComponentProvider provider = + new TraceBasedLogRecordProcessorComponentProvider(); + + LogRecordProcessor processor = provider.create(config); + + assertThat(processor).isInstanceOf(TraceBasedLogRecordProcessor.class); + assertThat(processor.toString()).contains("TraceBasedLogRecordProcessor"); + } + + private static DeclarativeConfigProperties getConfig(String yaml) { + Object yamlObj = + DeclarativeConfiguration.loadYaml( + new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8)), + Collections.emptyMap()); + + return DeclarativeConfiguration.toConfigProperties( + yamlObj, + ComponentLoader.forClassLoader( + TraceBasedLogRecordProcessorComponentProviderTest.class.getClassLoader())); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java new file mode 100644 index 00000000000..d1befb23694 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessor.java @@ -0,0 +1,69 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; + +/** + * Implementation of {@link LogRecordProcessor} that filters log records based on minimum severity + * level and delegates to downstream processors. + * + *

Only log records with severity greater than or equal to the configured minimum are forwarded. + */ +public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor { + + private final Severity minimumSeverity; + private final LogRecordProcessor delegate; + + SeverityBasedLogRecordProcessor(Severity minimumSeverity, LogRecordProcessor delegate) { + this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); + this.delegate = requireNonNull(delegate, "delegate"); + } + + /** + * Returns a new {@link SeverityBasedLogRecordProcessorBuilder} to construct a {@link + * SeverityBasedLogRecordProcessor}. + * + * @param minimumSeverity the minimum severity level required for processing + * @param delegate the processor to delegate to + * @return a new {@link SeverityBasedLogRecordProcessorBuilder} + */ + public static SeverityBasedLogRecordProcessorBuilder builder( + Severity minimumSeverity, LogRecordProcessor delegate) { + return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity, delegate); + } + + @Override + public void onEmit(Context context, ReadWriteLogRecord logRecord) { + if (logRecord.getSeverity().getSeverityNumber() >= minimumSeverity.getSeverityNumber()) { + delegate.onEmit(context, logRecord); + } + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return delegate.forceFlush(); + } + + @Override + public String toString() { + return "SeverityBasedLogRecordProcessor{" + + "minimumSeverity=" + + minimumSeverity + + ", delegate=" + + delegate + + '}'; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..9e99eedd14e --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java @@ -0,0 +1,31 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.api.logs.Severity; + +/** Builder class for {@link SeverityBasedLogRecordProcessor}. */ +public final class SeverityBasedLogRecordProcessorBuilder { + + private final Severity minimumSeverity; + private final LogRecordProcessor delegate; + + SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity, LogRecordProcessor delegate) { + this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity"); + this.delegate = requireNonNull(delegate, "delegate"); + } + + /** + * Returns a new {@link SeverityBasedLogRecordProcessor} with the configuration of this builder. + * + * @return a new {@link SeverityBasedLogRecordProcessor} + */ + public SeverityBasedLogRecordProcessor build() { + return new SeverityBasedLogRecordProcessor(minimumSeverity, delegate); + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java new file mode 100644 index 00000000000..c7130f1a568 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; + +/** + * A {@link LogRecordProcessor} that filters out log records associated with sampled out spans. + * + *

Log records not tied to any span (invalid span context) are not sampled out. + */ +public final class TraceBasedLogRecordProcessor implements LogRecordProcessor { + + private final LogRecordProcessor delegate; + + TraceBasedLogRecordProcessor(LogRecordProcessor delegate) { + this.delegate = requireNonNull(delegate, "delegate"); + } + + /** + * Returns a new {@link TraceBasedLogRecordProcessorBuilder} to construct a {@link + * TraceBasedLogRecordProcessor}. + * + * @param delegate the processor to delegate to + * @return a new {@link TraceBasedLogRecordProcessorBuilder} + */ + public static TraceBasedLogRecordProcessorBuilder builder(LogRecordProcessor delegate) { + return new TraceBasedLogRecordProcessorBuilder(delegate); + } + + @Override + public void onEmit(Context context, ReadWriteLogRecord logRecord) { + if (logRecord.getSpanContext().isValid() && !logRecord.getSpanContext().isSampled()) { + return; + } + delegate.onEmit(context, logRecord); + } + + @Override + public CompletableResultCode shutdown() { + return delegate.shutdown(); + } + + @Override + public CompletableResultCode forceFlush() { + return delegate.forceFlush(); + } + + @Override + public String toString() { + return "TraceBasedLogRecordProcessor{" + "delegate=" + delegate + '}'; + } +} diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java new file mode 100644 index 00000000000..a8d9f505692 --- /dev/null +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java @@ -0,0 +1,27 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static java.util.Objects.requireNonNull; + +/** Builder class for {@link TraceBasedLogRecordProcessor}. */ +public final class TraceBasedLogRecordProcessorBuilder { + + private final LogRecordProcessor delegate; + + TraceBasedLogRecordProcessorBuilder(LogRecordProcessor delegate) { + this.delegate = requireNonNull(delegate, "delegate"); + } + + /** + * Returns a new {@link TraceBasedLogRecordProcessor} with the configuration of this builder. + * + * @return a new {@link TraceBasedLogRecordProcessor} + */ + public TraceBasedLogRecordProcessor build() { + return new TraceBasedLogRecordProcessor(delegate); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java new file mode 100644 index 00000000000..d00b3f295b2 --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java @@ -0,0 +1,214 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class SeverityBasedLogRecordProcessorTest { + + @Mock private LogRecordProcessor delegate; + @Mock private ReadWriteLogRecord logRecord; + + private Context context; + + @BeforeEach + void setUp() { + context = Context.current(); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void builder_RequiresMinimumSeverity() { + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(null, delegate)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("minimumSeverity"); + } + + @Test + void builder_RequiresProcessor() { + assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(Severity.INFO, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("delegate"); + } + + @Test + void onEmit_SeverityMeetsMinimum_DelegatesToProcessor() { + when(logRecord.getSeverity()).thenReturn(Severity.WARN); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_SeverityAboveMinimum_DelegatesToProcessor() { + when(logRecord.getSeverity()).thenReturn(Severity.ERROR); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_SeverityBelowMinimum_DoesNotDelegate() { + when(logRecord.getSeverity()).thenReturn(Severity.DEBUG); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate, never()).onEmit(any(), any()); + } + + @Test + void onEmit_UndefinedSeverity_DoesNotDelegate() { + when(logRecord.getSeverity()).thenReturn(Severity.UNDEFINED_SEVERITY_NUMBER); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate, never()).onEmit(any(), any()); + } + + @Test + void onEmit_VariousSeverityLevels() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); + + // Test all severity levels + testSeverityLevel(processor, Severity.UNDEFINED_SEVERITY_NUMBER, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.TRACE4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.DEBUG4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO2, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO3, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.INFO4, /* shouldDelegate= */ false); + testSeverityLevel(processor, Severity.WARN, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.WARN4, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.ERROR4, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL2, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL3, /* shouldDelegate= */ true); + testSeverityLevel(processor, Severity.FATAL4, /* shouldDelegate= */ true); + } + + private void testSeverityLevel( + SeverityBasedLogRecordProcessor processor, Severity severity, boolean shouldDelegate) { + when(logRecord.getSeverity()).thenReturn(severity); + + processor.onEmit(context, logRecord); + + if (shouldDelegate) { + verify(delegate).onEmit(same(context), same(logRecord)); + } else { + verify(delegate, never()).onEmit(same(context), same(logRecord)); + } + + // Reset mock for next test + org.mockito.Mockito.reset(delegate); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void shutdown_DelegatesToProcessor() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(delegate).shutdown(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void forceFlush_DelegatesToProcessor() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(delegate).forceFlush(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void toString_Valid() { + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build(); + + String toString = processor.toString(); + assertThat(toString).contains("SeverityBasedLogRecordProcessor"); + assertThat(toString).contains("minimumSeverity=WARN"); + assertThat(toString).contains("delegate="); + } + + @Test + void shutdown_ProcessorFailure() { + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(delegate).shutdown(); + assertThat(result.isSuccess()).isFalse(); + } + + @Test + void forceFlush_ProcessorFailure() { + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); + + SeverityBasedLogRecordProcessor processor = + SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(delegate).forceFlush(); + assertThat(result.isSuccess()).isFalse(); + } +} diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java new file mode 100644 index 00000000000..baa46a804ba --- /dev/null +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java @@ -0,0 +1,193 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.logs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.api.trace.SpanId; +import io.opentelemetry.api.trace.TraceFlags; +import io.opentelemetry.api.trace.TraceId; +import io.opentelemetry.api.trace.TraceState; +import io.opentelemetry.context.Context; +import io.opentelemetry.sdk.common.CompletableResultCode; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class TraceBasedLogRecordProcessorTest { + + @Mock private LogRecordProcessor delegate; + @Mock private ReadWriteLogRecord logRecord; + + private Context context; + private SpanContext sampledSpanContext; + private SpanContext notSampledSpanContext; + private SpanContext invalidSpanContext; + + @BeforeEach + void setUp() { + context = Context.current(); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + + // Create sampled span context + sampledSpanContext = + SpanContext.create( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getSampled(), + TraceState.getDefault()); + + // Create not sampled span context + notSampledSpanContext = + SpanContext.create( + TraceId.fromLongs(1, 2), + SpanId.fromLong(3), + TraceFlags.getDefault(), + TraceState.getDefault()); + + // Create invalid span context + invalidSpanContext = SpanContext.getInvalid(); + } + + @Test + void builder_RequiresProcessor() { + assertThatThrownBy(() -> TraceBasedLogRecordProcessor.builder(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("delegate"); + } + + @Test + void onEmit_SampledSpanContext_DelegatesToProcessor() { + when(logRecord.getSpanContext()).thenReturn(sampledSpanContext); + + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_NotSampledSpanContext_DoesNotDelegate() { + when(logRecord.getSpanContext()).thenReturn(notSampledSpanContext); + + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate, never()).onEmit(any(), any()); + } + + @Test + void onEmit_InvalidSpanContext_DelegatesToProcessor() { + when(logRecord.getSpanContext()).thenReturn(invalidSpanContext); + + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + processor.onEmit(context, logRecord); + + verify(delegate).onEmit(same(context), same(logRecord)); + } + + @Test + void onEmit_VariousSpanContexts() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + // Test sampled span context + testSpanContext(processor, sampledSpanContext, /* shouldDelegate= */ true); + + // Test not sampled span context + testSpanContext(processor, notSampledSpanContext, /* shouldDelegate= */ false); + + // Test invalid span context + testSpanContext(processor, invalidSpanContext, /* shouldDelegate= */ true); + } + + private void testSpanContext( + TraceBasedLogRecordProcessor processor, SpanContext spanContext, boolean shouldDelegate) { + when(logRecord.getSpanContext()).thenReturn(spanContext); + + processor.onEmit(context, logRecord); + + if (shouldDelegate) { + verify(delegate).onEmit(same(context), same(logRecord)); + } else { + verify(delegate, never()).onEmit(same(context), same(logRecord)); + } + + // Reset mock for next test + org.mockito.Mockito.reset(delegate); + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess()); + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess()); + } + + @Test + void shutdown_DelegatesToProcessor() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(delegate).shutdown(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void forceFlush_DelegatesToProcessor() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(delegate).forceFlush(); + assertThat(result.isSuccess()).isTrue(); + } + + @Test + void toString_Valid() { + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + String toString = processor.toString(); + assertThat(toString).contains("TraceBasedLogRecordProcessor"); + assertThat(toString).contains("delegate="); + } + + @Test + void shutdown_ProcessorFailure() { + when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure()); + + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + CompletableResultCode result = processor.shutdown(); + + verify(delegate).shutdown(); + assertThat(result.isSuccess()).isFalse(); + } + + @Test + void forceFlush_ProcessorFailure() { + when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure()); + + TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build(); + + CompletableResultCode result = processor.forceFlush(); + + verify(delegate).forceFlush(); + assertThat(result.isSuccess()).isFalse(); + } +}