Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] Added SpanProcessor beforeEnd callback #6367

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@
*/
final class MultiSpanProcessor implements SpanProcessor {
private final List<SpanProcessor> spanProcessorsStart;
private final List<SpanProcessor> spanProcessorsBeforeEnd;
private final List<SpanProcessor> spanProcessorsEnd;
private final List<SpanProcessor> spanProcessorsAll;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
Expand Down Expand Up @@ -58,6 +59,18 @@ public boolean isEndRequired() {
return !spanProcessorsEnd.isEmpty();
}

@Override
public void beforeEnd(ReadWriteSpan span) {
for (SpanProcessor spanProcessor : spanProcessorsBeforeEnd) {
spanProcessor.beforeEnd(span);
}
}

@Override
public boolean isBeforeEndRequired() {
return !spanProcessorsBeforeEnd.isEmpty();
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
Expand All @@ -83,10 +96,14 @@ private MultiSpanProcessor(List<SpanProcessor> spanProcessors) {
this.spanProcessorsAll = spanProcessors;
this.spanProcessorsStart = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsEnd = new ArrayList<>(spanProcessorsAll.size());
this.spanProcessorsBeforeEnd = new ArrayList<>(spanProcessorsAll.size());
for (SpanProcessor spanProcessor : spanProcessorsAll) {
if (spanProcessor.isStartRequired()) {
spanProcessorsStart.add(spanProcessor);
}
if (spanProcessor.isBeforeEndRequired()) {
spanProcessorsBeforeEnd.add(spanProcessor);
}
if (spanProcessor.isEndRequired()) {
spanProcessorsEnd.add(spanProcessor);
}
Expand Down
43 changes: 26 additions & 17 deletions sdk/trace/src/main/java/io/opentelemetry/sdk/trace/SdkSpan.java
Expand Up @@ -95,9 +95,14 @@ final class SdkSpan implements ReadWriteSpan {
@GuardedBy("lock")
private long endEpochNanos;

// True if the span is ended.
private enum EndState {
NOT_ENDED,
ENDING,
ENDED
}

@GuardedBy("lock")
private boolean hasEnded;
private EndState hasEnded;

private SdkSpan(
SpanContext context,
Expand All @@ -122,7 +127,7 @@ private SdkSpan(
this.kind = kind;
this.spanProcessor = spanProcessor;
this.resource = resource;
this.hasEnded = false;
this.hasEnded = EndState.NOT_ENDED;
this.clock = clock;
this.startEpochNanos = startEpochNanos;
this.attributes = attributes;
Expand Down Expand Up @@ -220,7 +225,7 @@ public SpanData toSpanData() {
status,
name,
endEpochNanos,
hasEnded);
hasEnded == EndState.ENDED);
}
}

Expand All @@ -235,7 +240,7 @@ public <T> T getAttribute(AttributeKey<T> key) {
@Override
public boolean hasEnded() {
synchronized (lock) {
return hasEnded;
return hasEnded == EndState.ENDED;
}
}

Expand Down Expand Up @@ -281,7 +286,7 @@ public InstrumentationScopeInfo getInstrumentationScopeInfo() {
@Override
public long getLatencyNanos() {
synchronized (lock) {
return (hasEnded ? endEpochNanos : clock.now()) - startEpochNanos;
return (hasEnded == EndState.NOT_ENDED ? clock.now() : endEpochNanos) - startEpochNanos;
}
}

Expand All @@ -296,7 +301,7 @@ public <T> ReadWriteSpan setAttribute(AttributeKey<T> key, T value) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling setAttribute() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -373,7 +378,7 @@ public ReadWriteSpan addEvent(String name, Attributes attributes, long timestamp

private void addTimedEvent(EventData timedEvent) {
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling addEvent() on an ended Span.");
return;
}
Expand All @@ -393,7 +398,7 @@ public ReadWriteSpan setStatus(StatusCode statusCode, @Nullable String descripti
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling setStatus() on an ended Span.");
return this;
} else if (this.status.getStatusCode() == StatusCode.OK) {
Expand Down Expand Up @@ -431,7 +436,7 @@ public ReadWriteSpan updateName(String name) {
return this;
}
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling updateName() on an ended Span.");
return this;
}
Expand All @@ -456,7 +461,7 @@ public Span addLink(SpanContext spanContext, Attributes attributes) {
spanLimits.getMaxNumberOfAttributesPerLink(),
spanLimits.getMaxAttributeValueLength()));
synchronized (lock) {
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
logger.log(Level.FINE, "Calling addLink() on an ended Span.");
return this;
}
Expand Down Expand Up @@ -486,12 +491,16 @@ public void end(long timestamp, TimeUnit unit) {

private void endInternal(long endEpochNanos) {
synchronized (lock) {
if (hasEnded) {
logger.log(Level.FINE, "Calling end() on an ended Span.");
if (hasEnded != EndState.NOT_ENDED) {
logger.log(Level.FINE, "Calling end() on an ended or ending Span.");
return;
}
hasEnded = EndState.ENDING;
this.endEpochNanos = endEpochNanos;
hasEnded = true;
if (spanProcessor.isBeforeEndRequired()) {
spanProcessor.beforeEnd(this);
}
hasEnded = EndState.ENDED;
}
if (spanProcessor.isEndRequired()) {
spanProcessor.onEnd(this);
Expand All @@ -501,7 +510,7 @@ private void endInternal(long endEpochNanos) {
@Override
public boolean isRecording() {
synchronized (lock) {
return !hasEnded;
return hasEnded != EndState.ENDED;
}
}

Expand All @@ -526,7 +535,7 @@ private List<EventData> getImmutableTimedEvents() {

// if the span has ended, then the events are unmodifiable
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return Collections.unmodifiableList(events);
}

Expand All @@ -540,7 +549,7 @@ private Attributes getImmutableAttributes() {
}
// if the span has ended, then the attributes are unmodifiable,
// so we can return them directly and save copying all the data.
if (hasEnded) {
if (hasEnded == EndState.ENDED) {
return attributes;
}
// otherwise, make a copy of the data into an immutable container.
Expand Down
Expand Up @@ -85,6 +85,26 @@ static SpanProcessor composite(Iterable<SpanProcessor> processors) {
*/
boolean isEndRequired();


/**
* Called just before a {@link io.opentelemetry.api.trace.Span} is ended, if the {@link
* Span#isRecording()} returns true. This means that the span will still be mutable
*
* <p>This method is called synchronously on the execution thread, should not throw or block the
* execution thread.
*
* @param span the {@code Span} that is just about to be ended.
*/
default void beforeEnd(ReadWriteSpan span) {
}

/**
* Returns {@code true} if this {@link SpanProcessor} requires before-end events.
*/
default boolean isBeforeEndRequired() {
return false;
}

/**
* Processes all span events that have not yet been processed and closes used resources.
*
Expand Down
Expand Up @@ -35,10 +35,12 @@ class MultiSpanProcessorTest {
@BeforeEach
void setUp() {
when(spanProcessor1.isStartRequired()).thenReturn(true);
when(spanProcessor1.isBeforeEndRequired()).thenReturn(true);
when(spanProcessor1.isEndRequired()).thenReturn(true);
when(spanProcessor1.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor1.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.isStartRequired()).thenReturn(true);
when(spanProcessor2.isBeforeEndRequired()).thenReturn(true);
when(spanProcessor2.isEndRequired()).thenReturn(true);
when(spanProcessor2.forceFlush()).thenReturn(CompletableResultCode.ofSuccess());
when(spanProcessor2.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
Expand Down Expand Up @@ -67,6 +69,10 @@ void twoSpanProcessor() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2).onStart(same(Context.root()), same(readWriteSpan));

multiSpanProcessor.beforeEnd(readWriteSpan);
verify(spanProcessor1).beforeEnd(same(readWriteSpan));
verify(spanProcessor2).beforeEnd(same(readWriteSpan));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1).onEnd(same(readableSpan));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand All @@ -83,6 +89,7 @@ void twoSpanProcessor() {
@Test
void twoSpanProcessor_DifferentRequirements() {
when(spanProcessor1.isEndRequired()).thenReturn(false);
when(spanProcessor2.isBeforeEndRequired()).thenReturn(false);
when(spanProcessor2.isStartRequired()).thenReturn(false);
SpanProcessor multiSpanProcessor =
SpanProcessor.composite(Arrays.asList(spanProcessor1, spanProcessor2));
Expand All @@ -94,6 +101,10 @@ void twoSpanProcessor_DifferentRequirements() {
verify(spanProcessor1).onStart(same(Context.root()), same(readWriteSpan));
verify(spanProcessor2, times(0)).onStart(any(Context.class), any(ReadWriteSpan.class));

multiSpanProcessor.beforeEnd(readWriteSpan);
verify(spanProcessor1).beforeEnd(same(readWriteSpan));
verify(spanProcessor2, times(0)).beforeEnd(any(ReadWriteSpan.class));

multiSpanProcessor.onEnd(readableSpan);
verify(spanProcessor1, times(0)).onEnd(any(ReadableSpan.class));
verify(spanProcessor2).onEnd(same(readableSpan));
Expand Down
Expand Up @@ -17,6 +17,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -57,6 +59,8 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -106,6 +110,7 @@ void setUp() {
expectedAttributes = builder.build();
testClock = TestClock.create(Instant.ofEpochSecond(0, START_EPOCH_NANOS));
when(spanProcessor.isStartRequired()).thenReturn(true);
when(spanProcessor.isBeforeEndRequired()).thenReturn(true);
when(spanProcessor.isEndRequired()).thenReturn(true);
}

Expand Down Expand Up @@ -139,6 +144,54 @@ void endSpanTwice_DoNotCrash() {
assertThat(span.hasEnded()).isTrue();
}

@Test
void beforeEnd_spanStillMutable() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);

AttributeKey<String> dummyAttrib = AttributeKey.stringKey("processor_foo");

AtomicBoolean endedStateInProcessor = new AtomicBoolean();
doAnswer(invocation -> {
ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class);
assertThat(sp.hasEnded()).isFalse();
sp.end(); //should have no effect, nested end should be detected
endedStateInProcessor.set(sp.hasEnded());
sp.setAttribute(dummyAttrib, "bar");
return null;
}).when(spanProcessor).beforeEnd(any());

span.end();
verify(spanProcessor).beforeEnd(same(span));
assertThat(span.hasEnded()).isTrue();
assertThat(endedStateInProcessor.get()).isFalse();
assertThat(span.getAttribute(dummyAttrib)).isEqualTo("bar");
}

@Test
void beforeEnd_latencyPinned() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);

AtomicLong spanLatencyInProcessor = new AtomicLong();
doAnswer(invocation -> {
ReadWriteSpan sp = invocation.getArgument(0, ReadWriteSpan.class);

testClock.advance(Duration.ofSeconds(100));
spanLatencyInProcessor.set(sp.getLatencyNanos());
return null;
}).when(spanProcessor).beforeEnd(any());

testClock.advance(Duration.ofSeconds(1));
long expectedDuration = testClock.now() - START_EPOCH_NANOS;

assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration);

span.end();
verify(spanProcessor).beforeEnd(same(span));
assertThat(span.hasEnded()).isTrue();
assertThat(span.getLatencyNanos()).isEqualTo(expectedDuration);
assertThat(spanLatencyInProcessor.get()).isEqualTo(expectedDuration);
}

@Test
void toSpanData_ActiveSpan() {
SdkSpan span = createTestSpan(SpanKind.INTERNAL);
Expand Down