Skip to content

Commit

Permalink
Add context argument to LogRecordProcessor#onEmit (open-telemetry#4889)
Browse files Browse the repository at this point in the history
* Add context argument to LogRecordProcessor#onEmit

* Change argument order
  • Loading branch information
jack-berg authored and dmarkwat committed Dec 30, 2022
1 parent effa638 commit 9e6461c
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 33 deletions.
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -51,9 +52,11 @@ static LogRecordProcessor composite(Iterable<LogRecordProcessor> processors) {
/**
* Called when a {@link Logger} {@link LogRecordBuilder#emit()}s a log record.
*
* @param context the context set via {@link LogRecordBuilder#setContext(Context)}, or {@link
* Context#current()} if not explicitly set
* @param logRecord the log record
*/
void onEmit(ReadWriteLogRecord logRecord);
void onEmit(Context context, ReadWriteLogRecord logRecord);

/**
* Shutdown the log processor.
Expand Down
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -33,9 +34,9 @@ static LogRecordProcessor create(List<LogRecordProcessor> logRecordProcessorsLis
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
for (LogRecordProcessor logRecordProcessor : logRecordProcessors) {
logRecordProcessor.onEmit(logRecord);
logRecordProcessor.onEmit(context, logRecord);
}
}

Expand Down
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.context.Context;

final class NoopLogRecordProcessor implements LogRecordProcessor {
private static final NoopLogRecordProcessor INSTANCE = new NoopLogRecordProcessor();

Expand All @@ -15,5 +17,5 @@ static LogRecordProcessor getInstance() {
private NoopLogRecordProcessor() {}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {}
public void onEmit(Context context, ReadWriteLogRecord logRecord) {}
}
Expand Up @@ -10,7 +10,6 @@
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.internal.AttributesMap;
Expand All @@ -27,7 +26,7 @@ final class SdkLogRecordBuilder implements EventBuilder {

private final InstrumentationScopeInfo instrumentationScopeInfo;
private long epochNanos;
private SpanContext spanContext = SpanContext.getInvalid();
@Nullable private Context context;
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
@Nullable private String severityText;
private Body body = Body.empty();
Expand All @@ -54,7 +53,7 @@ public SdkLogRecordBuilder setEpoch(Instant instant) {

@Override
public SdkLogRecordBuilder setContext(Context context) {
this.spanContext = Span.fromContext(context).getSpanContext();
this.context = context;
return this;
}

Expand Down Expand Up @@ -95,15 +94,17 @@ public void emit() {
if (loggerSharedState.hasBeenShutdown()) {
return;
}
Context context = this.context == null ? Context.current() : this.context;
loggerSharedState
.getLogRecordProcessor()
.onEmit(
context,
SdkReadWriteLogRecord.create(
loggerSharedState.getLogLimits(),
loggerSharedState.getResource(),
instrumentationScopeInfo,
this.epochNanos == 0 ? this.loggerSharedState.getClock().now() : this.epochNanos,
spanContext,
Span.fromContext(context).getSpanContext(),
severity,
severityText,
body,
Expand Down
Expand Up @@ -9,6 +9,7 @@

import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
Expand Down Expand Up @@ -57,8 +58,9 @@ public SdkLoggerProviderBuilder setLogLimits(Supplier<LogLimits> logLimitsSuppli
}

/**
* Add a log processor. {@link LogRecordProcessor#onEmit(ReadWriteLogRecord)} will be called each
* time a log is emitted by {@link Logger} instances obtained from the {@link SdkLoggerProvider}.
* Add a log processor. {@link LogRecordProcessor#onEmit(Context, ReadWriteLogRecord)} will be
* called each time a log is emitted by {@link Logger} instances obtained from the {@link
* SdkLoggerProvider}.
*
* @param processor the log processor
* @return this
Expand Down
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
Expand Down Expand Up @@ -81,7 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
if (logRecord == null) {
return;
}
Expand Down
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.requireNonNull;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
Expand Down Expand Up @@ -58,7 +59,7 @@ public static LogRecordProcessor create(LogRecordExporter exporter) {
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
try {
List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData());
CompletableResultCode result = logRecordExporter.export(logs);
Expand Down
Expand Up @@ -10,6 +10,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -39,7 +40,7 @@ void setup() {
void empty() {
LogRecordProcessor multiLogRecordProcessor = LogRecordProcessor.composite();
assertThat(multiLogRecordProcessor).isInstanceOf(NoopLogRecordProcessor.class);
multiLogRecordProcessor.onEmit(logRecord);
multiLogRecordProcessor.onEmit(Context.current(), logRecord);
multiLogRecordProcessor.shutdown();
}

Expand All @@ -53,9 +54,10 @@ void oneLogRecordProcessor() {
void twoLogRecordProcessor() {
LogRecordProcessor multiLogRecordProcessor =
LogRecordProcessor.composite(logRecordProcessor1, logRecordProcessor2);
multiLogRecordProcessor.onEmit(logRecord);
verify(logRecordProcessor1).onEmit(same(logRecord));
verify(logRecordProcessor2).onEmit(same(logRecord));
Context context = Context.current();
multiLogRecordProcessor.onEmit(context, logRecord);
verify(logRecordProcessor1).onEmit(same(context), same(logRecord));
verify(logRecordProcessor2).onEmit(same(context), same(logRecord));

multiLogRecordProcessor.forceFlush();
verify(logRecordProcessor1).forceFlush();
Expand Down
Expand Up @@ -7,6 +7,7 @@

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import io.opentelemetry.context.Context;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -20,7 +21,7 @@ class NoopLogRecordProcessorTest {
@Test
void noCrash() {
LogRecordProcessor logRecordProcessor = NoopLogRecordProcessor.getInstance();
logRecordProcessor.onEmit(logRecord);
logRecordProcessor.onEmit(Context.current(), logRecord);
assertThat(logRecordProcessor.forceFlush().isSuccess()).isEqualTo(true);
assertThat(logRecordProcessor.shutdown().isSuccess()).isEqualTo(true);
}
Expand Down
Expand Up @@ -47,7 +47,8 @@ class SdkLogRecordBuilderTest {
@BeforeEach
void setup() {
when(loggerSharedState.getLogLimits()).thenReturn(LogLimits.getDefault());
when(loggerSharedState.getLogRecordProcessor()).thenReturn(emittedLog::set);
when(loggerSharedState.getLogRecordProcessor())
.thenReturn((context, logRecord) -> emittedLog.set(logRecord));
when(loggerSharedState.getResource()).thenReturn(RESOURCE);
when(loggerSharedState.getClock()).thenReturn(Clock.getDefault());

Expand Down
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -21,6 +22,8 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -29,6 +32,7 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.InstanceOfAssertFactories;
Expand Down Expand Up @@ -222,7 +226,7 @@ void loggerBuilder_WithLogRecordProcessor() {
SdkLoggerProvider.builder()
.setResource(resource)
.addLogRecordProcessor(
logRecord -> {
(unused, logRecord) -> {
logRecord.setAttribute(null, null);
// Overwrite k1
logRecord.setAttribute(AttributeKey.stringKey("k1"), "new-v1");
Expand Down Expand Up @@ -262,6 +266,50 @@ void loggerBuilder_WithLogRecordProcessor() {
Attributes.builder().put("k1", "new-v1").put("k2", "v2").put("k3", "v3").build());
}

@Test
void loggerBuilder_ProcessorWithContext() {
ContextKey<String> contextKey = ContextKey.named("my-context-key");
AtomicReference<LogRecordData> logRecordData = new AtomicReference<>();

sdkLoggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
(context, logRecord) ->
logRecord.setAttribute(
AttributeKey.stringKey("my-context-key"),
Optional.ofNullable(context.get(contextKey)).orElse("")))
.addLogRecordProcessor(
(unused, logRecord) -> logRecordData.set(logRecord.toLogRecordData()))
.build();

// With implicit context
try (Scope unused = Context.current().with(contextKey, "context-value1").makeCurrent()) {
sdkLoggerProvider
.loggerBuilder("test")
.build()
.logRecordBuilder()
.setBody("log message1")
.emit();
}
assertThat(logRecordData.get())
.hasBody("log message1")
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value1"));

// With explicit context
try (Scope unused = Context.current().with(contextKey, "context-value2").makeCurrent()) {
sdkLoggerProvider
.loggerBuilder("test")
.build()
.logRecordBuilder()
.setContext(Context.current())
.setBody("log message2")
.emit();
}
assertThat(logRecordData.get())
.hasBody("log message2")
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value2"));
}

@Test
void forceFlush() {
sdkLoggerProvider.forceFlush();
Expand All @@ -288,7 +336,7 @@ void canSetClock() {
Clock clock = mock(Clock.class);
when(clock.now()).thenReturn(now);
List<ReadWriteLogRecord> seenLogs = new ArrayList<>();
logRecordProcessor = seenLogs::add;
logRecordProcessor = (context, logRecord) -> seenLogs.add(logRecord);
sdkLoggerProvider =
SdkLoggerProvider.builder()
.setClock(clock)
Expand Down
Expand Up @@ -46,7 +46,7 @@ void logRecordBuilder() {
LoggerSharedState state = mock(LoggerSharedState.class);
InstrumentationScopeInfo info = InstrumentationScopeInfo.create("foo");
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
LogRecordProcessor logRecordProcessor = seenLog::set;
LogRecordProcessor logRecordProcessor = (context, logRecord) -> seenLog.set(logRecord);
Clock clock = mock(Clock.class);
when(clock.now()).thenReturn(5L);

Expand All @@ -69,7 +69,7 @@ void logRecordBuilder_maxAttributeLength() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(seenLog::set)
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
.setLogLimits(() -> LogLimits.builder().setMaxAttributeValueLength(maxLength).build())
.build();
LogRecordBuilder logRecordBuilder = loggerProvider.get("test").logRecordBuilder();
Expand Down Expand Up @@ -109,7 +109,7 @@ void logRecordBuilder_maxAttributes() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(seenLog::set)
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
.setLogLimits(
() -> LogLimits.builder().setMaxNumberOfAttributes(maxNumberOfAttrs).build())
.build();
Expand Down Expand Up @@ -140,15 +140,17 @@ void logRecordBuilder_AfterShutdown() {
loggerProvider.shutdown().join(10, TimeUnit.SECONDS);
loggerProvider.get("test").logRecordBuilder().emit();

verify(logRecordProcessor, never()).onEmit(any());
verify(logRecordProcessor, never()).onEmit(any(), any());
}

@Test
@SuppressLogger(loggerName = API_USAGE_LOGGER_NAME)
void eventBuilder() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder().addLogRecordProcessor(seenLog::set).build();
SdkLoggerProvider.builder()
.addLogRecordProcessor((context, logRecord) -> seenLog.set(logRecord))
.build();

// Emit event from logger with name and add event domain
loggerProvider
Expand Down
Expand Up @@ -291,7 +291,7 @@ void ignoresNullLogs() {
BatchLogRecordProcessor processor =
BatchLogRecordProcessor.builder(mockLogRecordExporter).build();
try {
assertThatCode(() -> processor.onEmit(null)).doesNotThrowAnyException();
assertThatCode(() -> processor.onEmit(null, null)).doesNotThrowAnyException();
} finally {
processor.shutdown();
}
Expand Down

0 comments on commit 9e6461c

Please sign in to comment.