Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BatchLogRecordProcessor supports concurrent export #6372

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -22,6 +22,10 @@
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -69,6 +73,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxConcurrentExport,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
Expand All @@ -77,6 +82,7 @@ public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecord
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxConcurrentExport,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..)
Expand Down Expand Up @@ -146,6 +152,8 @@ private static final class Worker implements Runnable {

private final LogRecordExporter logRecordExporter;
private final long scheduleDelayNanos;
private final int maxConcurrentExport;
private final Semaphore semaphore;
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;

Expand All @@ -163,11 +171,13 @@ private static final class Worker implements Runnable {
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<LogRecordData> batch;
private final ExecutorService executor;

private Worker(
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxConcurrentExport,
int maxExportBatchSize,
long exporterTimeoutNanos,
Queue<ReadWriteLogRecord> queue) {
Expand Down Expand Up @@ -211,6 +221,9 @@ private Worker(
false);

this.batch = new ArrayList<>(this.maxExportBatchSize);
this.maxConcurrentExport = maxConcurrentExport;
this.semaphore = new Semaphore(maxConcurrentExport);
this.executor = Executors.newFixedThreadPool(maxConcurrentExport);
}

private void addLog(ReadWriteLogRecord logData) {
Expand Down Expand Up @@ -278,6 +291,7 @@ private void updateNextExportTime() {
}

private CompletableResultCode shutdown() {

CompletableResultCode result = new CompletableResultCode();

CompletableResultCode flushResult = forceFlush();
Expand All @@ -295,6 +309,15 @@ private CompletableResultCode shutdown() {
});
});

for (int i = 0; i < maxConcurrentExport; i++) {
try {
semaphore.acquire();
} catch (InterruptedException e){
logger.log(Level.WARNING, "Acquire Semaphore for shutdown failed", e);
}
}
executor.shutdown();

return result;
}

Expand All @@ -317,13 +340,41 @@ private void exportCurrentBatch() {
}

try {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(batch.size(), exportedAttrs);
if (this.maxConcurrentExport == 1) {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
} else {
logger.log(Level.FINE, "Exporter failed");
try {
semaphore.acquire();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Acquire semaphore for exporter failed", e);
return;
}
ArrayList<LogRecordData> exportBatch = new ArrayList<>(batch.size());
exportBatch.addAll(batch);
Future<?> future = executor.submit(() -> {
try {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(exportBatch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(exportBatch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
} finally {
semaphore.release();
}
});
future.isDone();
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
Expand Down
Expand Up @@ -21,6 +21,7 @@ public final class BatchLogRecordProcessorBuilder {

// Visible for testing
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000;
static final int DEFAULT_MAX_CONCURRENT_EXPORT = 1;
// Visible for testing
static final int DEFAULT_MAX_QUEUE_SIZE = 2048;
// Visible for testing
Expand All @@ -30,6 +31,7 @@ public final class BatchLogRecordProcessorBuilder {

private final LogRecordExporter logRecordExporter;
private long scheduleDelayNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_SCHEDULE_DELAY_MILLIS);
private int maxConcurrentExport = DEFAULT_MAX_CONCURRENT_EXPORT;
private int maxQueueSize = DEFAULT_MAX_QUEUE_SIZE;
private int maxExportBatchSize = DEFAULT_MAX_EXPORT_BATCH_SIZE;
private long exporterTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(DEFAULT_EXPORT_TIMEOUT_MILLIS);
Expand Down Expand Up @@ -64,6 +66,20 @@ long getScheduleDelayNanos() {
return scheduleDelayNanos;
}

/**
* Sets the max numbers for concurrent export. If unset, defaults to {@value
* DEFAULT_MAX_CONCURRENT_EXPORT
*/
public BatchLogRecordProcessorBuilder setMaxConcurrentExport(int maxConcurrentExport) {
checkArgument(maxConcurrentExport > 0, "maxConcurrentExport must be non-negative");
this.maxConcurrentExport = maxConcurrentExport;
return this;
}

int getMaxConcurrentExport() {
return maxConcurrentExport;
}

/**
* Sets the maximum time an export will be allowed to run before being cancelled. If unset,
* defaults to {@value DEFAULT_EXPORT_TIMEOUT_MILLIS}ms.
Expand Down Expand Up @@ -152,6 +168,7 @@ public BatchLogRecordProcessor build() {
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxConcurrentExport,
maxQueueSize,
maxExportBatchSize,
exporterTimeoutNanos);
Expand Down