Skip to content

Commit

Permalink
Run exports with the Context the SpanProcessor was created in (#880)
Browse files Browse the repository at this point in the history
* Run exports in spanprocessor construction context

* Resolve psalm issues
  • Loading branch information
Nevay committed Dec 2, 2022
1 parent 0ce2fda commit fd8ac30
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 16 deletions.
19 changes: 13 additions & 6 deletions src/SDK/Trace/SpanProcessor/BatchSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use InvalidArgumentException;
use OpenTelemetry\API\Metrics\MeterProviderInterface;
use OpenTelemetry\API\Metrics\ObserverInterface;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
Expand Down Expand Up @@ -44,6 +45,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
private int $scheduledDelayNanos;
private int $maxExportBatchSize;
private bool $autoFlush;
private ContextInterface $exportContext;

private ?int $nextScheduledRun = null;
private bool $running = false;
Expand All @@ -55,7 +57,7 @@ class BatchSpanProcessor implements SpanProcessorInterface
private array $batch = [];
/** @var SplQueue<list<SpanDataInterface>> */
private SplQueue $queue;
/** @var SplQueue<array{int, string, ?CancellationInterface, bool}> */
/** @var SplQueue<array{int, string, ?CancellationInterface, bool, ContextInterface}> */
private SplQueue $flush;

private bool $closed = false;
Expand Down Expand Up @@ -93,6 +95,7 @@ public function __construct(
$this->maxExportBatchSize = $maxExportBatchSize;
$this->autoFlush = $autoFlush;

$this->exportContext = Context::getCurrent();
$this->queue = new SplQueue();
$this->flush = new SplQueue();

Expand Down Expand Up @@ -199,7 +202,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc
{
if ($flushMethod !== null) {
$flushId = $this->batchId + $this->queue->count() + (int) (bool) $this->batch;
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running]);
$this->flush->enqueue([$flushId, $flushMethod, $cancellation, !$this->running, Context::getCurrent()]);
}

if ($this->running) {
Expand All @@ -213,7 +216,8 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc
try {
for (;;) {
while (!$this->flush->isEmpty() && $this->flush->bottom()[0] <= $this->batchId) {
[, $flushMethod, $cancellation, $propagateResult] = $this->flush->dequeue();
[, $flushMethod, $cancellation, $propagateResult, $context] = $this->flush->dequeue();
$scope = $context->activate();

try {
$result = $this->exporter->$flushMethod($cancellation);
Expand All @@ -223,10 +227,11 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc
} catch (Throwable $e) {
if ($propagateResult) {
$exception = $e;

continue;
} else {
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
}
self::logError(sprintf('Unhandled %s error', $flushMethod), ['exception' => $e]);
} finally {
$scope->detach();
}
}

Expand All @@ -239,6 +244,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc
}
$batchSize = count($this->queue->bottom());
$this->batchId++;
$scope = $this->exportContext->activate();

try {
$this->exporter->export($this->queue->dequeue())->await();
Expand All @@ -247,6 +253,7 @@ private function flush(?string $flushMethod = null, ?CancellationInterface $canc
} finally {
$this->processed += $batchSize;
$this->queueSize -= $batchSize;
$scope->detach();
}
}
} finally {
Expand Down
25 changes: 15 additions & 10 deletions src/SDK/Trace/SpanProcessor/SimpleSpanProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace OpenTelemetry\SDK\Trace\SpanProcessor;

use Closure;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\SDK\Behavior\LogsMessagesTrait;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
Expand All @@ -21,9 +22,10 @@ class SimpleSpanProcessor implements SpanProcessorInterface
use LogsMessagesTrait;

private SpanExporterInterface $exporter;
private ContextInterface $exportContext;

private bool $running = false;
/** @var SplQueue<array{Closure, string, bool}> */
/** @var SplQueue<array{Closure, string, bool, ContextInterface}> */
private SplQueue $queue;

private bool $closed = false;
Expand All @@ -32,6 +34,7 @@ public function __construct(SpanExporterInterface $exporter)
{
$this->exporter = $exporter;

$this->exportContext = Context::getCurrent();
$this->queue = new SplQueue();
}

Expand All @@ -49,7 +52,7 @@ public function onEnd(ReadableSpanInterface $span): void
}

$spanData = $span->toSpanData();
$this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export');
$this->flush(fn () => $this->exporter->export([$spanData])->await(), 'export', false, $this->exportContext);
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
Expand All @@ -58,7 +61,7 @@ public function forceFlush(?CancellationInterface $cancellation = null): bool
return false;
}

return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true);
return $this->flush(fn (): bool => $this->exporter->forceFlush($cancellation), __FUNCTION__, true, Context::getCurrent());
}

public function shutdown(?CancellationInterface $cancellation = null): bool
Expand All @@ -69,12 +72,12 @@ public function shutdown(?CancellationInterface $cancellation = null): bool

$this->closed = true;

return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true);
return $this->flush(fn (): bool => $this->exporter->shutdown($cancellation), __FUNCTION__, true, Context::getCurrent());
}

private function flush(Closure $task, string $taskName, bool $propagateResult = false): bool
private function flush(Closure $task, string $taskName, bool $propagateResult, ContextInterface $context): bool
{
$this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running]);
$this->queue->enqueue([$task, $taskName, $propagateResult && !$this->running, $context]);

if ($this->running) {
return false;
Expand All @@ -86,7 +89,8 @@ private function flush(Closure $task, string $taskName, bool $propagateResult =

try {
while (!$this->queue->isEmpty()) {
[$task, $taskName, $propagateResult] = $this->queue->dequeue();
[$task, $taskName, $propagateResult, $context] = $this->queue->dequeue();
$scope = $context->activate();

try {
$result = $task();
Expand All @@ -96,10 +100,11 @@ private function flush(Closure $task, string $taskName, bool $propagateResult =
} catch (Throwable $e) {
if ($propagateResult) {
$exception = $e;

continue;
} else {
self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]);
}
self::logError(sprintf('Unhandled %s error', $taskName), ['exception' => $e]);
} finally {
$scope->detach();
}
}
} finally {
Expand Down

0 comments on commit fd8ac30

Please sign in to comment.