Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add context argument to LogRecordProcessor#onEmit #4889

Merged
merged 2 commits into from Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -7,6 +7,7 @@

import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.io.Closeable;
import java.util.ArrayList;
Expand Down Expand Up @@ -52,8 +53,10 @@ static LogRecordProcessor composite(Iterable<LogRecordProcessor> processors) {
* Called when a {@link Logger} {@link LogRecordBuilder#emit()}s a log record.
*
* @param logRecord the log record
* @param context the context set via {@link LogRecordBuilder#setContext(Context)}, or {@link
* Context#current()} if not explicitly set
*/
void onEmit(ReadWriteLogRecord logRecord);
void onEmit(ReadWriteLogRecord logRecord, Context context);
jack-berg marked this conversation as resolved.
Show resolved Hide resolved

/**
* Shutdown the log processor.
Expand Down
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -33,9 +34,9 @@ static LogRecordProcessor create(List<LogRecordProcessor> logRecordProcessorsLis
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(ReadWriteLogRecord logRecord, Context context) {
for (LogRecordProcessor logRecordProcessor : logRecordProcessors) {
logRecordProcessor.onEmit(logRecord);
logRecordProcessor.onEmit(logRecord, context);
}
}

Expand Down
Expand Up @@ -5,6 +5,8 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.context.Context;

final class NoopLogRecordProcessor implements LogRecordProcessor {
private static final NoopLogRecordProcessor INSTANCE = new NoopLogRecordProcessor();

Expand All @@ -15,5 +17,5 @@ static LogRecordProcessor getInstance() {
private NoopLogRecordProcessor() {}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {}
public void onEmit(ReadWriteLogRecord logRecord, Context context) {}
}
Expand Up @@ -10,7 +10,6 @@
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.internal.AttributesMap;
Expand All @@ -27,7 +26,7 @@ final class SdkLogRecordBuilder implements EventBuilder {

private final InstrumentationScopeInfo instrumentationScopeInfo;
private long epochNanos;
private SpanContext spanContext = SpanContext.getInvalid();
@Nullable private Context context;
private Severity severity = Severity.UNDEFINED_SEVERITY_NUMBER;
@Nullable private String severityText;
private Body body = Body.empty();
Expand All @@ -54,7 +53,7 @@ public SdkLogRecordBuilder setEpoch(Instant instant) {

@Override
public SdkLogRecordBuilder setContext(Context context) {
this.spanContext = Span.fromContext(context).getSpanContext();
this.context = context;
return this;
}

Expand Down Expand Up @@ -95,6 +94,7 @@ public void emit() {
if (loggerSharedState.hasBeenShutdown()) {
return;
}
Context context = this.context == null ? Context.current() : this.context;
loggerSharedState
.getLogRecordProcessor()
.onEmit(
Expand All @@ -103,10 +103,11 @@ public void emit() {
loggerSharedState.getResource(),
instrumentationScopeInfo,
this.epochNanos == 0 ? this.loggerSharedState.getClock().now() : this.epochNanos,
spanContext,
Span.fromContext(context).getSpanContext(),
severity,
severityText,
body,
attributes));
attributes),
context);
}
}
Expand Up @@ -9,6 +9,7 @@

import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.logs.Logger;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
Expand Down Expand Up @@ -57,8 +58,9 @@ public SdkLoggerProviderBuilder setLogLimits(Supplier<LogLimits> logLimitsSuppli
}

/**
* Add a log processor. {@link LogRecordProcessor#onEmit(ReadWriteLogRecord)} will be called each
* time a log is emitted by {@link Logger} instances obtained from the {@link SdkLoggerProvider}.
* Add a log processor. {@link LogRecordProcessor#onEmit(ReadWriteLogRecord, Context)} will be
* called each time a log is emitted by {@link Logger} instances obtained from the {@link
* SdkLoggerProvider}.
*
* @param processor the log processor
* @return this
Expand Down
Expand Up @@ -10,6 +10,7 @@
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
Expand Down Expand Up @@ -81,7 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(ReadWriteLogRecord logRecord, Context context) {
if (logRecord == null) {
return;
}
Expand Down
Expand Up @@ -7,6 +7,7 @@

import static java.util.Objects.requireNonNull;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
Expand Down Expand Up @@ -58,7 +59,7 @@ public static LogRecordProcessor create(LogRecordExporter exporter) {
}

@Override
public void onEmit(ReadWriteLogRecord logRecord) {
public void onEmit(ReadWriteLogRecord logRecord, Context context) {
try {
List<LogRecordData> logs = Collections.singletonList(logRecord.toLogRecordData());
CompletableResultCode result = logRecordExporter.export(logs);
Expand Down
Expand Up @@ -10,6 +10,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -39,7 +40,7 @@ void setup() {
void empty() {
LogRecordProcessor multiLogRecordProcessor = LogRecordProcessor.composite();
assertThat(multiLogRecordProcessor).isInstanceOf(NoopLogRecordProcessor.class);
multiLogRecordProcessor.onEmit(logRecord);
multiLogRecordProcessor.onEmit(logRecord, Context.current());
multiLogRecordProcessor.shutdown();
}

Expand All @@ -53,9 +54,10 @@ void oneLogRecordProcessor() {
void twoLogRecordProcessor() {
LogRecordProcessor multiLogRecordProcessor =
LogRecordProcessor.composite(logRecordProcessor1, logRecordProcessor2);
multiLogRecordProcessor.onEmit(logRecord);
verify(logRecordProcessor1).onEmit(same(logRecord));
verify(logRecordProcessor2).onEmit(same(logRecord));
Context context = Context.current();
multiLogRecordProcessor.onEmit(logRecord, context);
verify(logRecordProcessor1).onEmit(same(logRecord), same(context));
verify(logRecordProcessor2).onEmit(same(logRecord), same(context));

multiLogRecordProcessor.forceFlush();
verify(logRecordProcessor1).forceFlush();
Expand Down
Expand Up @@ -7,6 +7,7 @@

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import io.opentelemetry.context.Context;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -20,7 +21,7 @@ class NoopLogRecordProcessorTest {
@Test
void noCrash() {
LogRecordProcessor logRecordProcessor = NoopLogRecordProcessor.getInstance();
logRecordProcessor.onEmit(logRecord);
logRecordProcessor.onEmit(logRecord, Context.current());
assertThat(logRecordProcessor.forceFlush().isSuccess()).isEqualTo(true);
assertThat(logRecordProcessor.shutdown().isSuccess()).isEqualTo(true);
}
Expand Down
Expand Up @@ -47,7 +47,8 @@ class SdkLogRecordBuilderTest {
@BeforeEach
void setup() {
when(loggerSharedState.getLogLimits()).thenReturn(LogLimits.getDefault());
when(loggerSharedState.getLogRecordProcessor()).thenReturn(emittedLog::set);
when(loggerSharedState.getLogRecordProcessor())
.thenReturn((logRecord, context) -> emittedLog.set(logRecord));
when(loggerSharedState.getResource()).thenReturn(RESOURCE);
when(loggerSharedState.getClock()).thenReturn(Clock.getDefault());

Expand Down
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.sdk.testing.assertj.LogAssertions.assertThat;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.entry;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -21,6 +22,8 @@
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.common.CompletableResultCode;
Expand All @@ -29,6 +32,7 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.InstanceOfAssertFactories;
Expand Down Expand Up @@ -222,7 +226,7 @@ void loggerBuilder_WithLogRecordProcessor() {
SdkLoggerProvider.builder()
.setResource(resource)
.addLogRecordProcessor(
logRecord -> {
(logRecord, unused) -> {
logRecord.setAttribute(null, null);
// Overwrite k1
logRecord.setAttribute(AttributeKey.stringKey("k1"), "new-v1");
Expand Down Expand Up @@ -262,6 +266,50 @@ void loggerBuilder_WithLogRecordProcessor() {
Attributes.builder().put("k1", "new-v1").put("k2", "v2").put("k3", "v3").build());
}

@Test
void loggerBuilder_ProcessorWithContext() {
ContextKey<String> contextKey = ContextKey.named("my-context-key");
AtomicReference<LogRecordData> logRecordData = new AtomicReference<>();

sdkLoggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
(logRecord, context) ->
logRecord.setAttribute(
AttributeKey.stringKey("my-context-key"),
Optional.ofNullable(context.get(contextKey)).orElse("")))
.addLogRecordProcessor(
(logRecord, unused) -> logRecordData.set(logRecord.toLogRecordData()))
.build();

// With implicit context
try (Scope unused = Context.current().with(contextKey, "context-value1").makeCurrent()) {
sdkLoggerProvider
.loggerBuilder("test")
.build()
.logRecordBuilder()
.setBody("log message1")
.emit();
}
assertThat(logRecordData.get())
.hasBody("log message1")
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value1"));

// With explicit context
try (Scope unused = Context.current().with(contextKey, "context-value2").makeCurrent()) {
sdkLoggerProvider
.loggerBuilder("test")
.build()
.logRecordBuilder()
.setContext(Context.current())
.setBody("log message2")
.emit();
}
assertThat(logRecordData.get())
.hasBody("log message2")
.hasAttributes(entry(AttributeKey.stringKey("my-context-key"), "context-value2"));
}

@Test
void forceFlush() {
sdkLoggerProvider.forceFlush();
Expand All @@ -288,7 +336,7 @@ void canSetClock() {
Clock clock = mock(Clock.class);
when(clock.now()).thenReturn(now);
List<ReadWriteLogRecord> seenLogs = new ArrayList<>();
logRecordProcessor = seenLogs::add;
logRecordProcessor = (logRecord, context) -> seenLogs.add(logRecord);
sdkLoggerProvider =
SdkLoggerProvider.builder()
.setClock(clock)
Expand Down
Expand Up @@ -46,7 +46,7 @@ void logRecordBuilder() {
LoggerSharedState state = mock(LoggerSharedState.class);
InstrumentationScopeInfo info = InstrumentationScopeInfo.create("foo");
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
LogRecordProcessor logRecordProcessor = seenLog::set;
LogRecordProcessor logRecordProcessor = (logRecord, context) -> seenLog.set(logRecord);
Clock clock = mock(Clock.class);
when(clock.now()).thenReturn(5L);

Expand All @@ -69,7 +69,7 @@ void logRecordBuilder_maxAttributeLength() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(seenLog::set)
.addLogRecordProcessor((logRecord, context) -> seenLog.set(logRecord))
.setLogLimits(() -> LogLimits.builder().setMaxAttributeValueLength(maxLength).build())
.build();
LogRecordBuilder logRecordBuilder = loggerProvider.get("test").logRecordBuilder();
Expand Down Expand Up @@ -109,7 +109,7 @@ void logRecordBuilder_maxAttributes() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(seenLog::set)
.addLogRecordProcessor((logRecord, context) -> seenLog.set(logRecord))
.setLogLimits(
() -> LogLimits.builder().setMaxNumberOfAttributes(maxNumberOfAttrs).build())
.build();
Expand Down Expand Up @@ -140,15 +140,17 @@ void logRecordBuilder_AfterShutdown() {
loggerProvider.shutdown().join(10, TimeUnit.SECONDS);
loggerProvider.get("test").logRecordBuilder().emit();

verify(logRecordProcessor, never()).onEmit(any());
verify(logRecordProcessor, never()).onEmit(any(), any());
}

@Test
@SuppressLogger(loggerName = API_USAGE_LOGGER_NAME)
void eventBuilder() {
AtomicReference<ReadWriteLogRecord> seenLog = new AtomicReference<>();
SdkLoggerProvider loggerProvider =
SdkLoggerProvider.builder().addLogRecordProcessor(seenLog::set).build();
SdkLoggerProvider.builder()
.addLogRecordProcessor((logRecord, context) -> seenLog.set(logRecord))
.build();

// Emit event from logger with name and add event domain
loggerProvider
Expand Down
Expand Up @@ -291,7 +291,7 @@ void ignoresNullLogs() {
BatchLogRecordProcessor processor =
BatchLogRecordProcessor.builder(mockLogRecordExporter).build();
try {
assertThatCode(() -> processor.onEmit(null)).doesNotThrowAnyException();
assertThatCode(() -> processor.onEmit(null, null)).doesNotThrowAnyException();
} finally {
processor.shutdown();
}
Expand Down