From 9e6461c1235bf99be023e4e544cfa49fa899f220 Mon Sep 17 00:00:00 2001 From: jack-berg <34418638+jack-berg@users.noreply.github.com> Date: Wed, 9 Nov 2022 11:59:06 -0600 Subject: [PATCH] Add context argument to LogRecordProcessor#onEmit (#4889) * Add context argument to LogRecordProcessor#onEmit * Change argument order --- .../sdk/logs/LogRecordProcessor.java | 5 +- .../sdk/logs/MultiLogRecordProcessor.java | 5 +- .../sdk/logs/NoopLogRecordProcessor.java | 4 +- .../sdk/logs/SdkLogRecordBuilder.java | 9 ++-- .../sdk/logs/SdkLoggerProviderBuilder.java | 6 ++- .../logs/export/BatchLogRecordProcessor.java | 3 +- .../logs/export/SimpleLogRecordProcessor.java | 3 +- .../sdk/logs/MultiLogRecordProcessorTest.java | 10 ++-- .../sdk/logs/NoopLogRecordProcessorTest.java | 3 +- .../sdk/logs/SdkLogRecordBuilderTest.java | 3 +- .../sdk/logs/SdkLoggerProviderTest.java | 52 ++++++++++++++++++- .../opentelemetry/sdk/logs/SdkLoggerTest.java | 12 +++-- .../export/BatchLogRecordProcessorTest.java | 2 +- .../export/SimpleLogRecordProcessorTest.java | 15 +++--- 14 files changed, 99 insertions(+), 33 deletions(-) diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordProcessor.java index 6dcdf279fc6..df46275ba58 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogRecordProcessor.java @@ -7,6 +7,7 @@ import io.opentelemetry.api.logs.LogRecordBuilder; import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import java.io.Closeable; import java.util.ArrayList; @@ -51,9 +52,11 @@ static LogRecordProcessor composite(Iterable processors) { /** * Called when a {@link Logger} {@link LogRecordBuilder#emit()}s a log record. * + * @param context the context set via {@link LogRecordBuilder#setContext(Context)}, or {@link + * Context#current()} if not explicitly set * @param logRecord the log record */ - void onEmit(ReadWriteLogRecord logRecord); + void onEmit(Context context, ReadWriteLogRecord logRecord); /** * Shutdown the log processor. diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessor.java index 25fe6a5049d..e84b38c86b6 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessor.java @@ -5,6 +5,7 @@ package io.opentelemetry.sdk.logs; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import java.util.ArrayList; import java.util.List; @@ -33,9 +34,9 @@ static LogRecordProcessor create(List logRecordProcessorsLis } @Override - public void onEmit(ReadWriteLogRecord logRecord) { + public void onEmit(Context context, ReadWriteLogRecord logRecord) { for (LogRecordProcessor logRecordProcessor : logRecordProcessors) { - logRecordProcessor.onEmit(logRecord); + logRecordProcessor.onEmit(context, logRecord); } } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessor.java index 24af6885a69..518cdfa785a 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessor.java @@ -5,6 +5,8 @@ package io.opentelemetry.sdk.logs; +import io.opentelemetry.context.Context; + final class NoopLogRecordProcessor implements LogRecordProcessor { private static final NoopLogRecordProcessor INSTANCE = new NoopLogRecordProcessor(); @@ -15,5 +17,5 @@ static LogRecordProcessor getInstance() { private NoopLogRecordProcessor() {} @Override - public void onEmit(ReadWriteLogRecord logRecord) {} + public void onEmit(Context context, ReadWriteLogRecord logRecord) {} } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java index 0c333cc6fc3..e1f159aff3d 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilder.java @@ -10,7 +10,6 @@ import io.opentelemetry.api.logs.LogRecordBuilder; import io.opentelemetry.api.logs.Severity; import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.internal.AttributesMap; @@ -27,7 +26,7 @@ final class SdkLogRecordBuilder implements EventBuilder { private final InstrumentationScopeInfo instrumentationScopeInfo; private long epochNanos; - private SpanContext spanContext = SpanContext.getInvalid(); + @Nullable private Context context; private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER; @Nullable private String severityText; private Body body = Body.empty(); @@ -54,7 +53,7 @@ public SdkLogRecordBuilder setEpoch(Instant instant) { @Override public SdkLogRecordBuilder setContext(Context context) { - this.spanContext = Span.fromContext(context).getSpanContext(); + this.context = context; return this; } @@ -95,15 +94,17 @@ public void emit() { if (loggerSharedState.hasBeenShutdown()) { return; } + Context context = this.context == null ? Context.current() : this.context; loggerSharedState .getLogRecordProcessor() .onEmit( + context, SdkReadWriteLogRecord.create( loggerSharedState.getLogLimits(), loggerSharedState.getResource(), instrumentationScopeInfo, this.epochNanos == 0 ? this.loggerSharedState.getClock().now() : this.epochNanos, - spanContext, + Span.fromContext(context).getSpanContext(), severity, severityText, body, diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java index 6f99640cf18..0b2ed52a753 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/SdkLoggerProviderBuilder.java @@ -9,6 +9,7 @@ import io.opentelemetry.api.logs.LogRecordBuilder; import io.opentelemetry.api.logs.Logger; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.logs.data.LogRecordData; import io.opentelemetry.sdk.resources.Resource; @@ -57,8 +58,9 @@ public SdkLoggerProviderBuilder setLogLimits(Supplier logLimitsSuppli } /** - * Add a log processor. {@link LogRecordProcessor#onEmit(ReadWriteLogRecord)} will be called each - * time a log is emitted by {@link Logger} instances obtained from the {@link SdkLoggerProvider}. + * Add a log processor. {@link LogRecordProcessor#onEmit(Context, ReadWriteLogRecord)} will be + * called each time a log is emitted by {@link Logger} instances obtained from the {@link + * SdkLoggerProvider}. * * @param processor the log processor * @return this diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java index 1c92cad0af5..f327b3b4acc 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessor.java @@ -10,6 +10,7 @@ import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.MeterProvider; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.internal.DaemonThreadFactory; import io.opentelemetry.sdk.logs.LogRecordProcessor; @@ -81,7 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord } @Override - public void onEmit(ReadWriteLogRecord logRecord) { + public void onEmit(Context context, ReadWriteLogRecord logRecord) { if (logRecord == null) { return; } diff --git a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java index b7703d4cde1..3bb510718aa 100644 --- a/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java +++ b/sdk/logs/src/main/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessor.java @@ -7,6 +7,7 @@ import static java.util.Objects.requireNonNull; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.LogRecordProcessor; import io.opentelemetry.sdk.logs.ReadWriteLogRecord; @@ -58,7 +59,7 @@ public static LogRecordProcessor create(LogRecordExporter exporter) { } @Override - public void onEmit(ReadWriteLogRecord logRecord) { + public void onEmit(Context context, ReadWriteLogRecord logRecord) { try { List logs = Collections.singletonList(logRecord.toLogRecordData()); CompletableResultCode result = logRecordExporter.export(logs); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessorTest.java index 8a5a32e72b0..57aaa61fc32 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/MultiLogRecordProcessorTest.java @@ -10,6 +10,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.common.CompletableResultCode; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -39,7 +40,7 @@ void setup() { void empty() { LogRecordProcessor multiLogRecordProcessor = LogRecordProcessor.composite(); assertThat(multiLogRecordProcessor).isInstanceOf(NoopLogRecordProcessor.class); - multiLogRecordProcessor.onEmit(logRecord); + multiLogRecordProcessor.onEmit(Context.current(), logRecord); multiLogRecordProcessor.shutdown(); } @@ -53,9 +54,10 @@ void oneLogRecordProcessor() { void twoLogRecordProcessor() { LogRecordProcessor multiLogRecordProcessor = LogRecordProcessor.composite(logRecordProcessor1, logRecordProcessor2); - multiLogRecordProcessor.onEmit(logRecord); - verify(logRecordProcessor1).onEmit(same(logRecord)); - verify(logRecordProcessor2).onEmit(same(logRecord)); + Context context = Context.current(); + multiLogRecordProcessor.onEmit(context, logRecord); + verify(logRecordProcessor1).onEmit(same(context), same(logRecord)); + verify(logRecordProcessor2).onEmit(same(context), same(logRecord)); multiLogRecordProcessor.forceFlush(); verify(logRecordProcessor1).forceFlush(); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessorTest.java index c80cb15cbcf..3a209e02041 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/NoopLogRecordProcessorTest.java @@ -7,6 +7,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import io.opentelemetry.context.Context; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -20,7 +21,7 @@ class NoopLogRecordProcessorTest { @Test void noCrash() { LogRecordProcessor logRecordProcessor = NoopLogRecordProcessor.getInstance(); - logRecordProcessor.onEmit(logRecord); + logRecordProcessor.onEmit(Context.current(), logRecord); assertThat(logRecordProcessor.forceFlush().isSuccess()).isEqualTo(true); assertThat(logRecordProcessor.shutdown().isSuccess()).isEqualTo(true); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java index 9775f2f4900..3f530d6057d 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLogRecordBuilderTest.java @@ -47,7 +47,8 @@ class SdkLogRecordBuilderTest { @BeforeEach void setup() { when(loggerSharedState.getLogLimits()).thenReturn(LogLimits.getDefault()); - when(loggerSharedState.getLogRecordProcessor()).thenReturn(emittedLog::set); + when(loggerSharedState.getLogRecordProcessor()) + .thenReturn((context, logRecord) -> emittedLog.set(logRecord)); when(loggerSharedState.getResource()).thenReturn(RESOURCE); when(loggerSharedState.getClock()).thenReturn(Clock.getDefault()); diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderTest.java index 2eaf70bc32a..7408fdfc8dc 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerProviderTest.java @@ -7,6 +7,7 @@ import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat; import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.entry; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -21,6 +22,8 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.Scope; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.Clock; import io.opentelemetry.sdk.common.CompletableResultCode; @@ -29,6 +32,7 @@ import io.opentelemetry.sdk.resources.Resource; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.InstanceOfAssertFactories; @@ -222,7 +226,7 @@ void loggerBuilder_WithLogRecordProcessor() { SdkLoggerProvider.builder() .setResource(resource) .addLogRecordProcessor( - logRecord -> { + (unused, logRecord) -> { logRecord.setAttribute(null, null); // Overwrite k1 logRecord.setAttribute(AttributeKey.stringKey("k1"), "new-v1"); @@ -262,6 +266,50 @@ void loggerBuilder_WithLogRecordProcessor() { Attributes.builder().put("k1", "new-v1").put("k2", "v2").put("k3", "v3").build()); } + @Test + void loggerBuilder_ProcessorWithContext() { + ContextKey contextKey = ContextKey.named("my-context-key"); + AtomicReference logRecordData = new AtomicReference<>(); + + sdkLoggerProvider = + SdkLoggerProvider.builder() + .addLogRecordProcessor( + (context, logRecord) -> + logRecord.setAttribute( + AttributeKey.stringKey("my-context-key"), + Optional.ofNullable(context.get(contextKey)).orElse(""))) + .addLogRecordProcessor( + (unused, logRecord) -> logRecordData.set(logRecord.toLogRecordData())) + .build(); + + // With implicit context + try (Scope unused = Context.current().with(contextKey, "context-value1").makeCurrent()) { + sdkLoggerProvider + .loggerBuilder("test") + .build() + .logRecordBuilder() + .setBody("log message1") + .emit(); + } + assertThat(logRecordData.get()) + .hasBody("log message1") + .hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value1")); + + // With explicit context + try (Scope unused = Context.current().with(contextKey, "context-value2").makeCurrent()) { + sdkLoggerProvider + .loggerBuilder("test") + .build() + .logRecordBuilder() + .setContext(Context.current()) + .setBody("log message2") + .emit(); + } + assertThat(logRecordData.get()) + .hasBody("log message2") + .hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value2")); + } + @Test void forceFlush() { sdkLoggerProvider.forceFlush(); @@ -288,7 +336,7 @@ void canSetClock() { Clock clock = mock(Clock.class); when(clock.now()).thenReturn(now); List seenLogs = new ArrayList<>(); - logRecordProcessor = seenLogs::add; + logRecordProcessor = (context, logRecord) -> seenLogs.add(logRecord); sdkLoggerProvider = SdkLoggerProvider.builder() .setClock(clock) diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java index b35b6b926a3..e96820132e4 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/SdkLoggerTest.java @@ -46,7 +46,7 @@ void logRecordBuilder() { LoggerSharedState state = mock(LoggerSharedState.class); InstrumentationScopeInfo info = InstrumentationScopeInfo.create("foo"); AtomicReference seenLog = new AtomicReference<>(); - LogRecordProcessor logRecordProcessor = seenLog::set; + LogRecordProcessor logRecordProcessor = (context, logRecord) -> seenLog.set(logRecord); Clock clock = mock(Clock.class); when(clock.now()).thenReturn(5L); @@ -69,7 +69,7 @@ void logRecordBuilder_maxAttributeLength() { AtomicReference seenLog = new AtomicReference<>(); SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() - .addLogRecordProcessor(seenLog::set) + .addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord)) .setLogLimits(() -> LogLimits.builder().setMaxAttributeValueLength(maxLength).build()) .build(); LogRecordBuilder logRecordBuilder = loggerProvider.get("test").logRecordBuilder(); @@ -109,7 +109,7 @@ void logRecordBuilder_maxAttributes() { AtomicReference seenLog = new AtomicReference<>(); SdkLoggerProvider loggerProvider = SdkLoggerProvider.builder() - .addLogRecordProcessor(seenLog::set) + .addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord)) .setLogLimits( () -> LogLimits.builder().setMaxNumberOfAttributes(maxNumberOfAttrs).build()) .build(); @@ -140,7 +140,7 @@ void logRecordBuilder_AfterShutdown() { loggerProvider.shutdown().join(10, TimeUnit.SECONDS); loggerProvider.get("test").logRecordBuilder().emit(); - verify(logRecordProcessor, never()).onEmit(any()); + verify(logRecordProcessor, never()).onEmit(any(), any()); } @Test @@ -148,7 +148,9 @@ void logRecordBuilder_AfterShutdown() { void eventBuilder() { AtomicReference seenLog = new AtomicReference<>(); SdkLoggerProvider loggerProvider = - SdkLoggerProvider.builder().addLogRecordProcessor(seenLog::set).build(); + SdkLoggerProvider.builder() + .addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord)) + .build(); // Emit event from logger with name and add event domain loggerProvider diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java index 32cfecbed17..1039b7e34bc 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/BatchLogRecordProcessorTest.java @@ -291,7 +291,7 @@ void ignoresNullLogs() { BatchLogRecordProcessor processor = BatchLogRecordProcessor.builder(mockLogRecordExporter).build(); try { - assertThatCode(() -> processor.onEmit(null)).doesNotThrowAnyException(); + assertThatCode(() -> processor.onEmit(null, null)).doesNotThrowAnyException(); } finally { processor.shutdown(); } diff --git a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorTest.java b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorTest.java index 3ce15654092..1550c68b3ef 100644 --- a/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorTest.java +++ b/sdk/logs/src/test/java/io/opentelemetry/sdk/logs/export/SimpleLogRecordProcessorTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import io.opentelemetry.context.Context; import io.opentelemetry.internal.testing.slf4j.SuppressLogger; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.logs.LogRecordProcessor; @@ -58,7 +59,7 @@ void create_NullExporter() { @Test void onEmit() { - logRecordProcessor.onEmit(readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); verify(logRecordExporter).export(Collections.singletonList(LOG_RECORD_DATA)); } @@ -66,8 +67,8 @@ void onEmit() { @SuppressLogger(SimpleLogRecordProcessor.class) void onEmit_ExporterError() { when(logRecordExporter.export(any())).thenThrow(new RuntimeException("Exporter error!")); - logRecordProcessor.onEmit(readWriteLogRecord); - logRecordProcessor.onEmit(readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); verify(logRecordExporter, times(2)).export(anyList()); } @@ -78,8 +79,8 @@ void forceFlush() { when(logRecordExporter.export(any())).thenReturn(export1, export2); - logRecordProcessor.onEmit(readWriteLogRecord); - logRecordProcessor.onEmit(readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); verify(logRecordExporter, times(2)).export(Collections.singletonList(LOG_RECORD_DATA)); @@ -101,8 +102,8 @@ void shutdown() { when(logRecordExporter.export(any())).thenReturn(export1, export2); - logRecordProcessor.onEmit(readWriteLogRecord); - logRecordProcessor.onEmit(readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); + logRecordProcessor.onEmit(Context.current(), readWriteLogRecord); verify(logRecordExporter, times(2)).export(Collections.singletonList(LOG_RECORD_DATA));