Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option in BatchSpanProcessor to support multiple pending exports #4280

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -69,14 +71,16 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
long exporterTimeoutNanos,
int maxActiveExports) {
this.worker =
new Worker(
spanExporter,
meterProvider,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
maxActiveExports,
JcTools.newFixedSizeQueue(maxQueueSize));
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
Expand Down Expand Up @@ -149,6 +153,7 @@ private static final class Worker implements Runnable {
private final long scheduleDelayNanos;
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;
private final int maxActiveExports;

private long nextExportTime;

Expand All @@ -165,17 +170,22 @@ private static final class Worker implements Runnable {
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;

private final Set<CompletableResultCode> activeExports =
Collections.newSetFromMap(new ConcurrentHashMap<>());
trask marked this conversation as resolved.
Show resolved Hide resolved
trask marked this conversation as resolved.
Show resolved Hide resolved

private Worker(
SpanExporter spanExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
int maxActiveExports,
Queue<ReadableSpan> queue) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.maxActiveExports = maxActiveExports;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);
Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.trace").build();
Expand Down Expand Up @@ -266,6 +276,7 @@ private void flush() {
}
}
exportCurrentBatch();
CompletableResultCode.ofAll(activeExports).join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
CompletableResultCode flushResult = flushRequested.get();
if (flushResult != null) {
flushResult.succeed();
Expand Down Expand Up @@ -318,11 +329,25 @@ private void exportCurrentBatch() {

try {
CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedSpansCounter.add(batch.size(), exportedAttrs);
result.whenComplete(
() -> {
if (result.isSuccess()) {
processedSpansCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
});
if (activeExports.size() < maxActiveExports - 1) {
activeExports.add(result);
result.whenComplete(
() -> {
activeExports.remove(result);
});
} else {
logger.log(Level.FINE, "Exporter failed");
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (!result.isDone()) {
logger.log(Level.FINE, "Exporter timed out");
}
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
Expand Down
Expand Up @@ -23,12 +23,15 @@ public final class BatchSpanProcessorBuilder {
static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
// Visible for testing
static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
// Visible for testing
static final int DEFAULT_MAX_ACTIVE_EXPORTS = 1;

private final SpanExporter spanExporter;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
private int maxActiveExports = DEFAULT_MAX_ACTIVE_EXPORTS;
private MeterProvider meterProvider = MeterProvider.noop();

BatchSpanProcessorBuilder(SpanExporter spanExporter) {
Expand Down Expand Up @@ -122,6 +125,31 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
return this;
}

/**
* The maximum number of exports that can be active at any time.
*
* <p>The {@link BatchSpanProcessor}'s single worker thread will keep processing as many batches
* as it can without blocking on the {@link io.opentelemetry.sdk.common.CompletableResultCode}s
* that are returned from the {@code spanExporter}, but it will limit the total number of active
* exports in flight to this number.
*
* <p>Default value is {@code 1}.
*
* @param maxActiveExports the maximum number of exports that can be active at any time.
* @return this.
* @see BatchSpanProcessorBuilder#DEFAULT_MAX_ACTIVE_EXPORTS
*/
public BatchSpanProcessorBuilder setMaxActiveExports(int maxActiveExports) {
checkArgument(maxActiveExports > 0, "maxActiveExports must be positive.");
this.maxActiveExports = maxActiveExports;
return this;
}

// Visible for testing
int getMaxActiveExports() {
return maxActiveExports;
}

/**
* Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set,
* metrics will not be collected.
Expand Down Expand Up @@ -150,6 +178,7 @@ public BatchSpanProcessor build() {
scheduleDelayNanos,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos);
exporterTimeoutNanos,
maxActiveExports);
}
}
Expand Up @@ -31,8 +31,11 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -92,6 +95,8 @@ void builderDefaults() {
assertThat(builder.getExporterTimeoutNanos())
.isEqualTo(
TimeUnit.MILLISECONDS.toNanos(BatchSpanProcessorBuilder.DEFAULT_EXPORT_TIMEOUT_MILLIS));
assertThat(builder.getMaxActiveExports())
.isEqualTo(BatchSpanProcessorBuilder.DEFAULT_MAX_ACTIVE_EXPORTS);
}

@Test
Expand Down Expand Up @@ -189,7 +194,7 @@ void exportMoreSpansThanTheBufferSize() {
@Test
void forceExport() {
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1000);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxQueueSize(10_000)
Expand Down Expand Up @@ -220,6 +225,69 @@ void forceExport() {
assertThat(exported.size()).isEqualTo(2);
}

@Test
void forceFlushWithConcurrentExports() {
AtomicInteger maxActiveCount = new AtomicInteger();
// asyncDelayMillis is large enough so that two serial exports would exceed the timeoutMillis
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1000, CompletableResultCode.ofSuccess(), 1000, 600) {

private final AtomicInteger activeCount = new AtomicInteger();

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
int current = activeCount.incrementAndGet();
if (current > maxActiveCount.get()) {
maxActiveCount.set(current);
}
CompletableResultCode result = super.export(spans);
result.whenComplete(
() -> {
activeCount.decrementAndGet();
if (result.isSuccess()) {
result.succeed();
} else {
result.fail();
}
});
return result;
}
};

BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxQueueSize(10_000)
// Force flush should send all spans, make sure the number of spans we check here is
// not divisible by the batch size.
.setMaxExportBatchSize(49)
.setMaxActiveExports(10)
// scheduled export could give false positive that flush occurred
.setScheduleDelay(10, TimeUnit.MINUTES)
.build();

sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
for (int i = 0; i < 500; i++) {
createEndedSpan("notExported");
}
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(490);

for (int i = 0; i < 500; i++) {
createEndedSpan("notExported");
}
exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(490);

batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
exported = waitingSpanExporter.getExported();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(20);

assertThat(maxActiveCount.get()).isEqualTo(10);
}

@Test
void exportSpansToMultipleExporters() {
WaitingSpanExporter waitingSpanExporter =
Expand Down Expand Up @@ -610,28 +678,48 @@ public CompletableResultCode shutdown() {

static class WaitingSpanExporter implements SpanExporter {

private static final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();

private final Object lock = new Object();

@GuardedBy("lock")
private final List<SpanData> spanDataList = new ArrayList<>();

private final int numberToWaitFor;
private final CompletableResultCode exportResultCode;
private CountDownLatch countDownLatch;
private int timeout = 10;
private final int timeoutMillis;
private final int asyncDelayMillis;
private final AtomicBoolean shutDownCalled = new AtomicBoolean(false);

WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode) {
this(numberToWaitFor, exportResultCode, 10_000);
}

WaitingSpanExporter(
int numberToWaitFor, CompletableResultCode exportResultCode, int timeoutMillis) {
this(numberToWaitFor, exportResultCode, timeoutMillis, 0);
}

WaitingSpanExporter(
int numberToWaitFor,
CompletableResultCode exportResultCode,
int timeoutMillis,
int asyncDelayMillis) {
countDownLatch = new CountDownLatch(numberToWaitFor);
this.numberToWaitFor = numberToWaitFor;
this.exportResultCode = exportResultCode;
}

WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode, int timeout) {
this(numberToWaitFor, exportResultCode);
this.timeout = timeout;
this.timeoutMillis = timeoutMillis;
this.asyncDelayMillis = asyncDelayMillis;
}

List<SpanData> getExported() {
List<SpanData> result = new ArrayList<>(spanDataList);
spanDataList.clear();
return result;
synchronized (lock) {
List<SpanData> result = new ArrayList<>(spanDataList);
spanDataList.clear();
return result;
}
}

/**
Expand All @@ -644,7 +732,7 @@ List<SpanData> getExported() {
@Nullable
List<SpanData> waitForExport() {
try {
countDownLatch.await(timeout, TimeUnit.SECONDS);
countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Preserve the interruption status as per guidance.
Thread.currentThread().interrupt();
Expand All @@ -655,11 +743,40 @@ List<SpanData> waitForExport() {

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
this.spanDataList.addAll(spans);
if (asyncDelayMillis != 0) {
return storeAfterDelay(spans);
} else {
store(spans);
return exportResultCode;
}
}

@SuppressWarnings("FutureReturnValueIgnored")
private CompletableResultCode storeAfterDelay(Collection<SpanData> spans) {
// batch span processor clears the underlying collection immediately after calling export
List<SpanData> copy = new ArrayList<>(spans);
CompletableResultCode resultCode = new CompletableResultCode();
scheduledExecutorService.schedule(
() -> {
store(copy);
if (exportResultCode.isSuccess()) {
resultCode.succeed();
} else {
resultCode.fail();
}
},
asyncDelayMillis,
TimeUnit.MILLISECONDS);
return resultCode;
}

private void store(Collection<SpanData> spans) {
synchronized (lock) {
this.spanDataList.addAll(spans);
}
for (int i = 0; i < spans.size(); i++) {
countDownLatch.countDown();
}
return exportResultCode;
}

@Override
Expand Down