Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option in BatchSpanProcessor to support multiple pending exports #4280

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,8 +21,10 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -69,14 +71,16 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
long exporterTimeoutNanos,
int maxPendingExports) {
this.worker =
new Worker(
spanExporter,
meterProvider,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
maxPendingExports,
JcTools.newFixedSizeQueue(maxQueueSize));
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
Expand Down Expand Up @@ -149,6 +153,7 @@ private static final class Worker implements Runnable {
private final long scheduleDelayNanos;
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;
private final int maxPendingExports;

private long nextExportTime;

Expand All @@ -165,17 +170,22 @@ private static final class Worker implements Runnable {
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;

private final Set<CompletableResultCode> pendingExports =
Collections.newSetFromMap(new ConcurrentHashMap<>());
trask marked this conversation as resolved.
Show resolved Hide resolved

private Worker(
SpanExporter spanExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
int maxPendingExports,
Queue<ReadableSpan> queue) {
this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.maxPendingExports = maxPendingExports;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);
Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.trace").build();
Expand Down Expand Up @@ -266,6 +276,7 @@ private void flush() {
}
}
exportCurrentBatch();
CompletableResultCode.ofAll(pendingExports).join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
CompletableResultCode flushResult = flushRequested.get();
if (flushResult != null) {
flushResult.succeed();
Expand Down Expand Up @@ -318,11 +329,25 @@ private void exportCurrentBatch() {

try {
CompletableResultCode result = spanExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedSpansCounter.add(batch.size(), exportedAttrs);
result.whenComplete(
() -> {
if (result.isSuccess()) {
processedSpansCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
});
if (pendingExports.size() < maxPendingExports - 1) {
pendingExports.add(result);
result.whenComplete(
() -> {
pendingExports.remove(result);
});
} else {
logger.log(Level.FINE, "Exporter failed");
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (!result.isDone()) {
logger.log(Level.FINE, "Exporter timed out");
}
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
Expand Down
Expand Up @@ -23,12 +23,15 @@ public final class BatchSpanProcessorBuilder {
static final int DEFAULT_MAX_EXPORT_BATCH_SIZE = 512;
// Visible for testing
static final int DEFAULT_EXPORT_TIMEOUT_MILLIS = 30_000;
// Visible for testing
static final int DEFAULT_MAX_PENDING_EXPORTS = 1;

private final SpanExporter spanExporter;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
private int maxPendingExports = DEFAULT_MAX_PENDING_EXPORTS;
private MeterProvider meterProvider = MeterProvider.noop();

BatchSpanProcessorBuilder(SpanExporter spanExporter) {
Expand Down Expand Up @@ -122,6 +125,31 @@ public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
return this;
}

/**
* The maximum number of exports that can be pending at any time.
*
* <p>The {@link BatchSpanProcessor}'s single worker thread will keep processing as many batches
* as it can without blocking on the {@link io.opentelemetry.sdk.common.CompletableResultCode}s
* that are returned from the {@code spanExporter}, but it will limit the total number of pending
* exports in flight to this number.
*
* <p>Default value is {@code 1}.
*
* @param maxPendingExports the maximum number of exports that can be pending at any time.
* @return this.
* @see BatchSpanProcessorBuilder#DEFAULT_MAX_PENDING_EXPORTS
*/
public BatchSpanProcessorBuilder setMaxPendingExports(int maxPendingExports) {
checkArgument(maxPendingExports > 0, "maxPendingExports must be positive.");
this.maxPendingExports = maxPendingExports;
return this;
}

// Visible for testing
int getMaxPendingExports() {
return maxPendingExports;
}

/**
* Sets the {@link MeterProvider} to use to collect metrics related to batch export. If not set,
* metrics will not be collected.
Expand Down Expand Up @@ -150,6 +178,7 @@ public BatchSpanProcessor build() {
scheduleDelayNanos,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos);
exporterTimeoutNanos,
maxPendingExports);
}
}
Expand Up @@ -31,8 +31,11 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -92,6 +95,8 @@ void builderDefaults() {
assertThat(builder.getExporterTimeoutNanos())
.isEqualTo(
TimeUnit.MILLISECONDS.toNanos(BatchSpanProcessorBuilder.DEFAULT_EXPORT_TIMEOUT_MILLIS));
assertThat(builder.getMaxPendingExports())
.isEqualTo(BatchSpanProcessorBuilder.DEFAULT_MAX_PENDING_EXPORTS);
}

@Test
Expand Down Expand Up @@ -189,7 +194,7 @@ void exportMoreSpansThanTheBufferSize() {
@Test
void forceExport() {
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1);
new WaitingSpanExporter(100, CompletableResultCode.ofSuccess(), 1000);
BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxQueueSize(10_000)
Expand Down Expand Up @@ -220,6 +225,69 @@ void forceExport() {
assertThat(exported.size()).isEqualTo(2);
}

@Test
void forceFlushWithConcurrentExports() {
AtomicInteger maxPendingCount = new AtomicInteger();
// asyncDelayMillis is large enough so that two serial exports would exceed the timeoutMillis
WaitingSpanExporter waitingSpanExporter =
new WaitingSpanExporter(1000, CompletableResultCode.ofSuccess(), 1000, 600) {

private final AtomicInteger pendingCount = new AtomicInteger();

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
int current = pendingCount.incrementAndGet();
if (current > maxPendingCount.get()) {
maxPendingCount.set(current);
}
CompletableResultCode result = super.export(spans);
result.whenComplete(
() -> {
pendingCount.decrementAndGet();
if (result.isSuccess()) {
result.succeed();
} else {
result.fail();
}
});
return result;
}
};

BatchSpanProcessor batchSpanProcessor =
BatchSpanProcessor.builder(waitingSpanExporter)
.setMaxQueueSize(10_000)
// Force flush should send all spans, make sure the number of spans we check here is
// not divisible by the batch size.
.setMaxExportBatchSize(49)
.setMaxPendingExports(10)
// scheduled export could give false positive that flush occurred
.setScheduleDelay(10, TimeUnit.MINUTES)
.build();

sdkTracerProvider = SdkTracerProvider.builder().addSpanProcessor(batchSpanProcessor).build();
for (int i = 0; i < 500; i++) {
createEndedSpan("notExported");
}
List<SpanData> exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(490);

for (int i = 0; i < 500; i++) {
createEndedSpan("notExported");
}
exported = waitingSpanExporter.waitForExport();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(490);

batchSpanProcessor.forceFlush().join(10, TimeUnit.SECONDS);
exported = waitingSpanExporter.getExported();
assertThat(exported).isNotNull();
assertThat(exported.size()).isEqualTo(20);

assertThat(maxPendingCount.get()).isEqualTo(10);
}

@Test
void exportSpansToMultipleExporters() {
WaitingSpanExporter waitingSpanExporter =
Expand Down Expand Up @@ -610,28 +678,48 @@ public CompletableResultCode shutdown() {

static class WaitingSpanExporter implements SpanExporter {

private static final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();

private final Object lock = new Object();

@GuardedBy("lock")
private final List<SpanData> spanDataList = new ArrayList<>();

private final int numberToWaitFor;
private final CompletableResultCode exportResultCode;
private CountDownLatch countDownLatch;
private int timeout = 10;
private final int timeoutMillis;
private final int asyncDelayMillis;
private final AtomicBoolean shutDownCalled = new AtomicBoolean(false);

WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode) {
this(numberToWaitFor, exportResultCode, 10_000);
}

WaitingSpanExporter(
int numberToWaitFor, CompletableResultCode exportResultCode, int timeoutMillis) {
this(numberToWaitFor, exportResultCode, timeoutMillis, 0);
}

WaitingSpanExporter(
int numberToWaitFor,
CompletableResultCode exportResultCode,
int timeoutMillis,
int asyncDelayMillis) {
countDownLatch = new CountDownLatch(numberToWaitFor);
this.numberToWaitFor = numberToWaitFor;
this.exportResultCode = exportResultCode;
}

WaitingSpanExporter(int numberToWaitFor, CompletableResultCode exportResultCode, int timeout) {
this(numberToWaitFor, exportResultCode);
this.timeout = timeout;
this.timeoutMillis = timeoutMillis;
this.asyncDelayMillis = asyncDelayMillis;
}

List<SpanData> getExported() {
List<SpanData> result = new ArrayList<>(spanDataList);
spanDataList.clear();
return result;
synchronized (lock) {
List<SpanData> result = new ArrayList<>(spanDataList);
spanDataList.clear();
return result;
}
}

/**
Expand All @@ -644,7 +732,7 @@ List<SpanData> getExported() {
@Nullable
List<SpanData> waitForExport() {
try {
countDownLatch.await(timeout, TimeUnit.SECONDS);
countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Preserve the interruption status as per guidance.
Thread.currentThread().interrupt();
Expand All @@ -655,11 +743,40 @@ List<SpanData> waitForExport() {

@Override
public CompletableResultCode export(Collection<SpanData> spans) {
this.spanDataList.addAll(spans);
if (asyncDelayMillis != 0) {
return storeAfterDelay(spans);
} else {
store(spans);
return exportResultCode;
}
}

@SuppressWarnings("FutureReturnValueIgnored")
private CompletableResultCode storeAfterDelay(Collection<SpanData> spans) {
// batch span processor clears the underlying collection immediately after calling export
List<SpanData> copy = new ArrayList<>(spans);
CompletableResultCode resultCode = new CompletableResultCode();
scheduledExecutorService.schedule(
() -> {
store(copy);
if (exportResultCode.isSuccess()) {
resultCode.succeed();
} else {
resultCode.fail();
}
},
asyncDelayMillis,
TimeUnit.MILLISECONDS);
return resultCode;
}

private void store(Collection<SpanData> spans) {
synchronized (lock) {
this.spanDataList.addAll(spans);
}
for (int i = 0; i < spans.size(); i++) {
countDownLatch.countDown();
}
return exportResultCode;
}

@Override
Expand Down