diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java index a4b336b8b94..542ca027c89 100644 --- a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/DeclarativeConfiguration.java @@ -15,8 +15,10 @@ import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.autoconfigure.internal.SpiHelper; import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.LogRecordProcessorModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.OpenTelemetryConfigurationModel; import io.opentelemetry.sdk.extension.incubator.fileconfig.internal.model.SamplerModel; +import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.trace.samplers.Sampler; import java.io.Closeable; import java.io.IOException; @@ -210,6 +212,29 @@ public static Sampler createSampler(DeclarativeConfigProperties genericSamplerMo samplerModel); } + /** + * Create a {@link LogRecordProcessor} from the {@code logRecordProcessorModel} representing the + * log record processor config. + * + *
This is used when log record processors are composed, with one processor accepting one or + * more additional processors as config properties. The {@link ComponentProvider} implementation + * can call this to configure a delegate {@link LogRecordProcessor} from the {@link + * DeclarativeConfigProperties} corresponding to a particular config property. + */ + public static LogRecordProcessor createLogRecordProcessor( + DeclarativeConfigProperties genericLogRecordProcessorModel) { + YamlDeclarativeConfigProperties yamlDeclarativeConfigProperties = + requireYamlDeclarativeConfigProperties(genericLogRecordProcessorModel); + LogRecordProcessorModel logRecordProcessorModel = + MAPPER.convertValue( + DeclarativeConfigProperties.toMap(yamlDeclarativeConfigProperties), + LogRecordProcessorModel.class); + return createAndMaybeCleanup( + LogRecordProcessorFactory.getInstance(), + SpiHelper.create(yamlDeclarativeConfigProperties.getComponentLoader()), + logRecordProcessorModel); + } + private static YamlDeclarativeConfigProperties requireYamlDeclarativeConfigProperties( DeclarativeConfigProperties declarativeConfigProperties) { if (!(declarativeConfigProperties instanceof YamlDeclarativeConfigProperties)) { diff --git a/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java new file mode 100644 index 00000000000..22f06fe495c --- /dev/null +++ b/sdk-extensions/incubator/src/main/java/io/opentelemetry/sdk/extension/incubator/fileconfig/internal/SeverityBasedLogRecordProcessorComponentProvider.java @@ -0,0 +1,58 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.sdk.extension.incubator.fileconfig.internal; + +import io.opentelemetry.api.incubator.config.DeclarativeConfigProperties; +import io.opentelemetry.api.logs.Severity; +import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider; +import io.opentelemetry.sdk.extension.incubator.fileconfig.DeclarativeConfiguration; +import io.opentelemetry.sdk.logs.LogRecordProcessor; +import io.opentelemetry.sdk.logs.SeverityBasedLogRecordProcessor; + +/** + * ComponentProvider for SeverityBasedLogRecordProcessor to support declarative configuration. + * + *
This class is internal and is hence not for public use. Its APIs are unstable and can change
+ * at any time.
+ */
+public class SeverityBasedLogRecordProcessorComponentProvider
+ implements ComponentProvider This class is internal and is hence not for public use. Its APIs are unstable and can change
+ * at any time.
+ */
+public class TraceBasedLogRecordProcessorComponentProvider
+ implements ComponentProvider Only log records with severity greater than or equal to the configured minimum are forwarded.
+ */
+public final class SeverityBasedLogRecordProcessor implements LogRecordProcessor {
+
+ private final Severity minimumSeverity;
+ private final LogRecordProcessor delegate;
+
+ SeverityBasedLogRecordProcessor(Severity minimumSeverity, LogRecordProcessor delegate) {
+ this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity");
+ this.delegate = requireNonNull(delegate, "delegate");
+ }
+
+ /**
+ * Returns a new {@link SeverityBasedLogRecordProcessorBuilder} to construct a {@link
+ * SeverityBasedLogRecordProcessor}.
+ *
+ * @param minimumSeverity the minimum severity level required for processing
+ * @param delegate the processor to delegate to
+ * @return a new {@link SeverityBasedLogRecordProcessorBuilder}
+ */
+ public static SeverityBasedLogRecordProcessorBuilder builder(
+ Severity minimumSeverity, LogRecordProcessor delegate) {
+ return new SeverityBasedLogRecordProcessorBuilder(minimumSeverity, delegate);
+ }
+
+ @Override
+ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
+ if (logRecord.getSeverity().getSeverityNumber() >= minimumSeverity.getSeverityNumber()) {
+ delegate.onEmit(context, logRecord);
+ }
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ return delegate.shutdown();
+ }
+
+ @Override
+ public CompletableResultCode forceFlush() {
+ return delegate.forceFlush();
+ }
+
+ @Override
+ public String toString() {
+ return "SeverityBasedLogRecordProcessor{"
+ + "minimumSeverity="
+ + minimumSeverity
+ + ", delegate="
+ + delegate
+ + '}';
+ }
+}
diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java
new file mode 100644
index 00000000000..9e99eedd14e
--- /dev/null
+++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorBuilder.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.logs;
+
+import static java.util.Objects.requireNonNull;
+
+import io.opentelemetry.api.logs.Severity;
+
+/** Builder class for {@link SeverityBasedLogRecordProcessor}. */
+public final class SeverityBasedLogRecordProcessorBuilder {
+
+ private final Severity minimumSeverity;
+ private final LogRecordProcessor delegate;
+
+ SeverityBasedLogRecordProcessorBuilder(Severity minimumSeverity, LogRecordProcessor delegate) {
+ this.minimumSeverity = requireNonNull(minimumSeverity, "minimumSeverity");
+ this.delegate = requireNonNull(delegate, "delegate");
+ }
+
+ /**
+ * Returns a new {@link SeverityBasedLogRecordProcessor} with the configuration of this builder.
+ *
+ * @return a new {@link SeverityBasedLogRecordProcessor}
+ */
+ public SeverityBasedLogRecordProcessor build() {
+ return new SeverityBasedLogRecordProcessor(minimumSeverity, delegate);
+ }
+}
diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java
new file mode 100644
index 00000000000..c7130f1a568
--- /dev/null
+++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessor.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.logs;
+
+import static java.util.Objects.requireNonNull;
+
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+
+/**
+ * A {@link LogRecordProcessor} that filters out log records associated with sampled out spans.
+ *
+ * Log records not tied to any span (invalid span context) are not sampled out.
+ */
+public final class TraceBasedLogRecordProcessor implements LogRecordProcessor {
+
+ private final LogRecordProcessor delegate;
+
+ TraceBasedLogRecordProcessor(LogRecordProcessor delegate) {
+ this.delegate = requireNonNull(delegate, "delegate");
+ }
+
+ /**
+ * Returns a new {@link TraceBasedLogRecordProcessorBuilder} to construct a {@link
+ * TraceBasedLogRecordProcessor}.
+ *
+ * @param delegate the processor to delegate to
+ * @return a new {@link TraceBasedLogRecordProcessorBuilder}
+ */
+ public static TraceBasedLogRecordProcessorBuilder builder(LogRecordProcessor delegate) {
+ return new TraceBasedLogRecordProcessorBuilder(delegate);
+ }
+
+ @Override
+ public void onEmit(Context context, ReadWriteLogRecord logRecord) {
+ if (logRecord.getSpanContext().isValid() && !logRecord.getSpanContext().isSampled()) {
+ return;
+ }
+ delegate.onEmit(context, logRecord);
+ }
+
+ @Override
+ public CompletableResultCode shutdown() {
+ return delegate.shutdown();
+ }
+
+ @Override
+ public CompletableResultCode forceFlush() {
+ return delegate.forceFlush();
+ }
+
+ @Override
+ public String toString() {
+ return "TraceBasedLogRecordProcessor{" + "delegate=" + delegate + '}';
+ }
+}
diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java
new file mode 100644
index 00000000000..a8d9f505692
--- /dev/null
+++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorBuilder.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.logs;
+
+import static java.util.Objects.requireNonNull;
+
+/** Builder class for {@link TraceBasedLogRecordProcessor}. */
+public final class TraceBasedLogRecordProcessorBuilder {
+
+ private final LogRecordProcessor delegate;
+
+ TraceBasedLogRecordProcessorBuilder(LogRecordProcessor delegate) {
+ this.delegate = requireNonNull(delegate, "delegate");
+ }
+
+ /**
+ * Returns a new {@link TraceBasedLogRecordProcessor} with the configuration of this builder.
+ *
+ * @return a new {@link TraceBasedLogRecordProcessor}
+ */
+ public TraceBasedLogRecordProcessor build() {
+ return new TraceBasedLogRecordProcessor(delegate);
+ }
+}
diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java
new file mode 100644
index 00000000000..d00b3f295b2
--- /dev/null
+++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SeverityBasedLogRecordProcessorTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.logs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.opentelemetry.api.logs.Severity;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class SeverityBasedLogRecordProcessorTest {
+
+ @Mock private LogRecordProcessor delegate;
+ @Mock private ReadWriteLogRecord logRecord;
+
+ private Context context;
+
+ @BeforeEach
+ void setUp() {
+ context = Context.current();
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
+ }
+
+ @Test
+ void builder_RequiresMinimumSeverity() {
+ assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(null, delegate))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("minimumSeverity");
+ }
+
+ @Test
+ void builder_RequiresProcessor() {
+ assertThatThrownBy(() -> SeverityBasedLogRecordProcessor.builder(Severity.INFO, null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("delegate");
+ }
+
+ @Test
+ void onEmit_SeverityMeetsMinimum_DelegatesToProcessor() {
+ when(logRecord.getSeverity()).thenReturn(Severity.WARN);
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate).onEmit(same(context), same(logRecord));
+ }
+
+ @Test
+ void onEmit_SeverityAboveMinimum_DelegatesToProcessor() {
+ when(logRecord.getSeverity()).thenReturn(Severity.ERROR);
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate).onEmit(same(context), same(logRecord));
+ }
+
+ @Test
+ void onEmit_SeverityBelowMinimum_DoesNotDelegate() {
+ when(logRecord.getSeverity()).thenReturn(Severity.DEBUG);
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate, never()).onEmit(any(), any());
+ }
+
+ @Test
+ void onEmit_UndefinedSeverity_DoesNotDelegate() {
+ when(logRecord.getSeverity()).thenReturn(Severity.UNDEFINED_SEVERITY_NUMBER);
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate, never()).onEmit(any(), any());
+ }
+
+ @Test
+ void onEmit_VariousSeverityLevels() {
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build();
+
+ // Test all severity levels
+ testSeverityLevel(processor, Severity.UNDEFINED_SEVERITY_NUMBER, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.TRACE, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.TRACE2, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.TRACE3, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.TRACE4, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.DEBUG, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.DEBUG2, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.DEBUG3, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.DEBUG4, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.INFO, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.INFO2, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.INFO3, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.INFO4, /* shouldDelegate= */ false);
+ testSeverityLevel(processor, Severity.WARN, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.WARN2, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.WARN3, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.WARN4, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.ERROR, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.ERROR2, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.ERROR3, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.ERROR4, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.FATAL, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.FATAL2, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.FATAL3, /* shouldDelegate= */ true);
+ testSeverityLevel(processor, Severity.FATAL4, /* shouldDelegate= */ true);
+ }
+
+ private void testSeverityLevel(
+ SeverityBasedLogRecordProcessor processor, Severity severity, boolean shouldDelegate) {
+ when(logRecord.getSeverity()).thenReturn(severity);
+
+ processor.onEmit(context, logRecord);
+
+ if (shouldDelegate) {
+ verify(delegate).onEmit(same(context), same(logRecord));
+ } else {
+ verify(delegate, never()).onEmit(same(context), same(logRecord));
+ }
+
+ // Reset mock for next test
+ org.mockito.Mockito.reset(delegate);
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
+ }
+
+ @Test
+ void shutdown_DelegatesToProcessor() {
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build();
+
+ CompletableResultCode result = processor.shutdown();
+
+ verify(delegate).shutdown();
+ assertThat(result.isSuccess()).isTrue();
+ }
+
+ @Test
+ void forceFlush_DelegatesToProcessor() {
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build();
+
+ CompletableResultCode result = processor.forceFlush();
+
+ verify(delegate).forceFlush();
+ assertThat(result.isSuccess()).isTrue();
+ }
+
+ @Test
+ void toString_Valid() {
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.WARN, delegate).build();
+
+ String toString = processor.toString();
+ assertThat(toString).contains("SeverityBasedLogRecordProcessor");
+ assertThat(toString).contains("minimumSeverity=WARN");
+ assertThat(toString).contains("delegate=");
+ }
+
+ @Test
+ void shutdown_ProcessorFailure() {
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure());
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build();
+
+ CompletableResultCode result = processor.shutdown();
+
+ verify(delegate).shutdown();
+ assertThat(result.isSuccess()).isFalse();
+ }
+
+ @Test
+ void forceFlush_ProcessorFailure() {
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure());
+
+ SeverityBasedLogRecordProcessor processor =
+ SeverityBasedLogRecordProcessor.builder(Severity.INFO, delegate).build();
+
+ CompletableResultCode result = processor.forceFlush();
+
+ verify(delegate).forceFlush();
+ assertThat(result.isSuccess()).isFalse();
+ }
+}
diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java
new file mode 100644
index 00000000000..baa46a804ba
--- /dev/null
+++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/TraceBasedLogRecordProcessorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright The OpenTelemetry Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package io.opentelemetry.sdk.logs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.same;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanId;
+import io.opentelemetry.api.trace.TraceFlags;
+import io.opentelemetry.api.trace.TraceId;
+import io.opentelemetry.api.trace.TraceState;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.LENIENT)
+class TraceBasedLogRecordProcessorTest {
+
+ @Mock private LogRecordProcessor delegate;
+ @Mock private ReadWriteLogRecord logRecord;
+
+ private Context context;
+ private SpanContext sampledSpanContext;
+ private SpanContext notSampledSpanContext;
+ private SpanContext invalidSpanContext;
+
+ @BeforeEach
+ void setUp() {
+ context = Context.current();
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
+
+ // Create sampled span context
+ sampledSpanContext =
+ SpanContext.create(
+ TraceId.fromLongs(1, 2),
+ SpanId.fromLong(3),
+ TraceFlags.getSampled(),
+ TraceState.getDefault());
+
+ // Create not sampled span context
+ notSampledSpanContext =
+ SpanContext.create(
+ TraceId.fromLongs(1, 2),
+ SpanId.fromLong(3),
+ TraceFlags.getDefault(),
+ TraceState.getDefault());
+
+ // Create invalid span context
+ invalidSpanContext = SpanContext.getInvalid();
+ }
+
+ @Test
+ void builder_RequiresProcessor() {
+ assertThatThrownBy(() -> TraceBasedLogRecordProcessor.builder(null))
+ .isInstanceOf(NullPointerException.class)
+ .hasMessageContaining("delegate");
+ }
+
+ @Test
+ void onEmit_SampledSpanContext_DelegatesToProcessor() {
+ when(logRecord.getSpanContext()).thenReturn(sampledSpanContext);
+
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate).onEmit(same(context), same(logRecord));
+ }
+
+ @Test
+ void onEmit_NotSampledSpanContext_DoesNotDelegate() {
+ when(logRecord.getSpanContext()).thenReturn(notSampledSpanContext);
+
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate, never()).onEmit(any(), any());
+ }
+
+ @Test
+ void onEmit_InvalidSpanContext_DelegatesToProcessor() {
+ when(logRecord.getSpanContext()).thenReturn(invalidSpanContext);
+
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ processor.onEmit(context, logRecord);
+
+ verify(delegate).onEmit(same(context), same(logRecord));
+ }
+
+ @Test
+ void onEmit_VariousSpanContexts() {
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ // Test sampled span context
+ testSpanContext(processor, sampledSpanContext, /* shouldDelegate= */ true);
+
+ // Test not sampled span context
+ testSpanContext(processor, notSampledSpanContext, /* shouldDelegate= */ false);
+
+ // Test invalid span context
+ testSpanContext(processor, invalidSpanContext, /* shouldDelegate= */ true);
+ }
+
+ private void testSpanContext(
+ TraceBasedLogRecordProcessor processor, SpanContext spanContext, boolean shouldDelegate) {
+ when(logRecord.getSpanContext()).thenReturn(spanContext);
+
+ processor.onEmit(context, logRecord);
+
+ if (shouldDelegate) {
+ verify(delegate).onEmit(same(context), same(logRecord));
+ } else {
+ verify(delegate, never()).onEmit(same(context), same(logRecord));
+ }
+
+ // Reset mock for next test
+ org.mockito.Mockito.reset(delegate);
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
+ }
+
+ @Test
+ void shutdown_DelegatesToProcessor() {
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ CompletableResultCode result = processor.shutdown();
+
+ verify(delegate).shutdown();
+ assertThat(result.isSuccess()).isTrue();
+ }
+
+ @Test
+ void forceFlush_DelegatesToProcessor() {
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ CompletableResultCode result = processor.forceFlush();
+
+ verify(delegate).forceFlush();
+ assertThat(result.isSuccess()).isTrue();
+ }
+
+ @Test
+ void toString_Valid() {
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ String toString = processor.toString();
+ assertThat(toString).contains("TraceBasedLogRecordProcessor");
+ assertThat(toString).contains("delegate=");
+ }
+
+ @Test
+ void shutdown_ProcessorFailure() {
+ when(delegate.shutdown()).thenReturn(CompletableResultCode.ofFailure());
+
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ CompletableResultCode result = processor.shutdown();
+
+ verify(delegate).shutdown();
+ assertThat(result.isSuccess()).isFalse();
+ }
+
+ @Test
+ void forceFlush_ProcessorFailure() {
+ when(delegate.forceFlush()).thenReturn(CompletableResultCode.ofFailure());
+
+ TraceBasedLogRecordProcessor processor = TraceBasedLogRecordProcessor.builder(delegate).build();
+
+ CompletableResultCode result = processor.forceFlush();
+
+ verify(delegate).forceFlush();
+ assertThat(result.isSuccess()).isFalse();
+ }
+}