forked from open-telemetry/opentelemetry-java
-
Notifications
You must be signed in to change notification settings - Fork 1
/
BatchLogRecordProcessor.java
309 lines (279 loc) · 10.9 KB
/
BatchLogRecordProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.sdk.logs.export;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.MeterProvider;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Implementation of the {@link LogRecordProcessor} that batches logs exported by the SDK then
* pushes them to the exporter pipeline.
*
* <p>All logs reported by the SDK implementation are first added to a synchronized queue (with a
* {@code maxQueueSize} maximum size, if queue is full logs are dropped). Logs are exported either
* when there are {@code maxExportBatchSize} pending logs or {@code scheduleDelayNanos} has passed
* since the last export finished.
*/
public final class BatchLogRecordProcessor implements LogRecordProcessor {
private static final String WORKER_THREAD_NAME =
BatchLogRecordProcessor.class.getSimpleName() + "_WorkerThread";
private static final AttributeKey<String> LOG_RECORD_PROCESSOR_TYPE_LABEL =
AttributeKey.stringKey("logRecordProcessorType");
private static final AttributeKey<Boolean> LOG_RECORD_PROCESSOR_DROPPED_LABEL =
AttributeKey.booleanKey("dropped");
private static final String LOG_RECORD_PROCESSOR_TYPE_VALUE =
BatchLogRecordProcessor.class.getSimpleName();
private final Worker worker;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
/**
* Returns a new Builder for {@link BatchLogRecordProcessor}.
*
* @param logRecordExporter the {@link LogRecordExporter} to which the Logs are pushed
* @return a new {@link BatchLogRecordProcessor}.
* @throws NullPointerException if the {@code logRecordExporter} is {@code null}.
*/
public static BatchLogRecordProcessorBuilder builder(LogRecordExporter logRecordExporter) {
return new BatchLogRecordProcessorBuilder(logRecordExporter);
}
BatchLogRecordProcessor(
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxQueueSize,
int maxExportBatchSize,
long exporterTimeoutNanos) {
this.worker =
new Worker(
logRecordExporter,
meterProvider,
scheduleDelayNanos,
maxExportBatchSize,
exporterTimeoutNanos,
new ArrayBlockingQueue<>(maxQueueSize)); // TODO: use JcTools.newFixedSizeQueue(..)
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();
}
@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
if (logRecord == null) {
return;
}
worker.addLog(logRecord);
}
@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
return CompletableResultCode.ofSuccess();
}
return worker.shutdown();
}
@Override
public CompletableResultCode forceFlush() {
return worker.forceFlush();
}
// Visible for testing
ArrayList<LogRecordData> getBatch() {
return worker.batch;
}
// Worker is a thread that batches multiple logs and calls the registered LogRecordExporter to
// export
// the data.
private static final class Worker implements Runnable {
private static final Logger logger = Logger.getLogger(Worker.class.getName());
private final LongCounter processedLogsCounter;
private final Attributes droppedAttrs;
private final Attributes exportedAttrs;
private final LogRecordExporter logRecordExporter;
private final long scheduleDelayNanos;
private final int maxExportBatchSize;
private final long exporterTimeoutNanos;
private long nextExportTime;
private final Queue<ReadWriteLogRecord> queue;
// When waiting on the logs queue, exporter thread sets this atomic to the number of more
// logs it needs before doing an export. Writer threads would then wait for the queue to reach
// logsNeeded size before notifying the exporter thread about new entries.
// Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since
// exporter thread doesn't expect any signal initially, this value is initialized to
// Integer.MAX_VALUE.
private final AtomicInteger logsNeeded = new AtomicInteger(Integer.MAX_VALUE);
private final BlockingQueue<Boolean> signal;
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<LogRecordData> batch;
private Worker(
LogRecordExporter logRecordExporter,
MeterProvider meterProvider,
long scheduleDelayNanos,
int maxExportBatchSize,
long exporterTimeoutNanos,
Queue<ReadWriteLogRecord> queue) {
this.logRecordExporter = logRecordExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);
Meter meter = meterProvider.meterBuilder("io.opentelemetry.sdk.logs").build();
meter
.gaugeBuilder("queueSize")
.ofLongs()
.setDescription("The number of logs queued")
.setUnit("1")
.buildWithCallback(
result ->
result.record(
queue.size(),
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL, LOG_RECORD_PROCESSOR_TYPE_VALUE)));
processedLogsCounter =
meter
.counterBuilder("processedLogs")
.setUnit("1")
.setDescription(
"The number of logs processed by the BatchLogRecordProcessor. "
+ "[dropped=true if they were dropped due to high throughput]")
.build();
droppedAttrs =
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL,
LOG_RECORD_PROCESSOR_TYPE_VALUE,
LOG_RECORD_PROCESSOR_DROPPED_LABEL,
true);
exportedAttrs =
Attributes.of(
LOG_RECORD_PROCESSOR_TYPE_LABEL,
LOG_RECORD_PROCESSOR_TYPE_VALUE,
LOG_RECORD_PROCESSOR_DROPPED_LABEL,
false);
this.batch = new ArrayList<>(this.maxExportBatchSize);
}
private void addLog(ReadWriteLogRecord logData) {
if (!queue.offer(logData)) {
processedLogsCounter.add(1, droppedAttrs);
} else {
if (queue.size() >= logsNeeded.get()) {
signal.offer(true);
}
}
}
@Override
public void run() {
updateNextExportTime();
while (continueWork) {
if (flushRequested.get() != null) {
flush();
}
while (!queue.isEmpty() && batch.size() < maxExportBatchSize) {
batch.add(queue.poll().toLogRecordData());
}
if (batch.size() >= maxExportBatchSize || System.nanoTime() >= nextExportTime) {
exportCurrentBatch();
updateNextExportTime();
}
if (queue.isEmpty()) {
try {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
logsNeeded.set(maxExportBatchSize - batch.size());
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
logsNeeded.set(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private void flush() {
int logsToFlush = queue.size();
while (logsToFlush > 0) {
ReadWriteLogRecord logRecord = queue.poll();
assert logRecord != null;
batch.add(logRecord.toLogRecordData());
logsToFlush--;
if (batch.size() >= maxExportBatchSize) {
exportCurrentBatch();
}
}
exportCurrentBatch();
CompletableResultCode flushResult = flushRequested.get();
if (flushResult != null) {
flushResult.succeed();
flushRequested.set(null);
}
}
private void updateNextExportTime() {
nextExportTime = System.nanoTime() + scheduleDelayNanos;
}
private CompletableResultCode shutdown() {
CompletableResultCode result = new CompletableResultCode();
CompletableResultCode flushResult = forceFlush();
flushResult.whenComplete(
() -> {
continueWork = false;
CompletableResultCode shutdownResult = logRecordExporter.shutdown();
shutdownResult.whenComplete(
() -> {
if (!flushResult.isSuccess() || !shutdownResult.isSuccess()) {
result.fail();
} else {
result.succeed();
}
});
});
return result;
}
private CompletableResultCode forceFlush() {
CompletableResultCode flushResult = new CompletableResultCode();
// we set the atomic here to trigger the worker loop to do a flush of the entire queue.
if (flushRequested.compareAndSet(null, flushResult)) {
signal.offer(true);
}
CompletableResultCode possibleResult = flushRequested.get();
// there's a race here where the flush happening in the worker loop could complete before we
// get what's in the atomic. In that case, just return success, since we know it succeeded in
// the interim.
return possibleResult == null ? CompletableResultCode.ofSuccess() : possibleResult;
}
private void exportCurrentBatch() {
if (batch.isEmpty()) {
return;
}
try {
CompletableResultCode result =
logRecordExporter.export(Collections.unmodifiableList(batch));
result.join(exporterTimeoutNanos, TimeUnit.NANOSECONDS);
if (result.isSuccess()) {
processedLogsCounter.add(batch.size(), exportedAttrs);
} else {
logger.log(Level.FINE, "Exporter failed");
}
} catch (RuntimeException e) {
logger.log(Level.WARNING, "Exporter threw an Exception", e);
} finally {
batch.clear();
}
}
}
}