Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support concurrent exports #781

Merged
merged 11 commits into from May 17, 2022
Merged

Conversation

jwilm
Copy link
Contributor

@jwilm jwilm commented Apr 14, 2022

Problem

Applications generating significant span volume can end up dropping data due to the synchronous export step. According to the opentelemetry spec,

This function [export()] will never be called concurrently for the same exporter instance. It can be called again only after the current call returns.

However, it does not place a restriction on concurrent I/O or anything of that nature. There is an ongoing discussion about tweaking the language to make this more clear, and it seems there is consensus on concurrent transmissions being OK with the spec.

Approach

With that in mind, this commit makes the exporters return a future that can be spawned concurrently. Unfortunately, this means that the export() method can no longer be async while taking &mut self. The latter is desirable to enforce the no concurrent calls line of the spec, so the choice is made here to return a future instead with the lifetime decoupled from self. This resulted in a bit of additional verbosity, but for the most part the async code can still be shoved into an async fn for the ergonomics.

The jaeger exporter is left untouched for now as the changes here might take a couple of different approaches (decouple exporter via a channel, or a bigger refactor to fix the &mut borrows deep in the exporter logic).

Discussion

I'm opening this PR as a draft to start some discussion with the OpenTelemetry-Rust maintainers. To kick this off, I have results to share from a particularly busy application of ours which has been dropping tons of spans due to reaching queue capacity:

image

This is a 2 day period. To the left, we have millions of spans dropped because the queue was not being flushed fast enough. To the right, there are nearly 0 occurrences of dropped spans due to this issue.

For us, this is a massive improvement.

A couple of questions for the maintainers:

  1. Is this sort of feature interesting to the project? I recognize that not everyone will need this, but at least for us, it is quite important (to the point we would likely maintain a patch to keep this functionality).
  2. Assuming the feature is interesting for the project, how do you feel about the overall approach?
  3. Finally, I could use some input on the jaeger side if there is interest in landing this patch. This exporter is quite a bit more complex and relies heavily on &mut borrows several layers down.

Remaining Work

  • Join spawned tasks on shutdown()
  • Configuration for number of spawned export tasks export_concurrency
  • Limit spawned export tasks to export_concurrency
  • Synchronous behavior when export_concurrency: 1
  • Attempt to drain queue on force_flush
  • Jaeger exporter
  • Clean up TODOs

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Apr 14, 2022

CLA Signed

The committers listed above are authorized under a signed CLA.

@TommyCpp
Copy link
Contributor

Thanks for the feedback:+1:, I agree with the approach in principle. We interpreted the spec as BatchSpanProcessor must finish sending one batch span before starting another.

My concern is, like others pointed out in the spec issue, without a bound it can cause an OOM. We can add a parameter to config the number of concurrent sending tasks. If the parameter is set to 1 then disable the concurrent sending.

Another issue is based on the current model, the shutdown function will return after the last batch is exported. If we are exporting multiple batches concurrently we need to make sure they all finished exporting before returning from shutdown. Similar to force_flush function(We may need to define whether flush means here. Is it starting the exporting or finishing the exporting?)

Jaeger is a little complex. I think the &mut borrow is caused by emit_batch method of generated thrift agent. We may need to propagate futures from uploaders. Need to take a closer look to figure out what's the best approach here.

PS. For some reason, your link to the spec issue is not working. open-telemetry/opentelemetry-specification#2434.

@jwilm
Copy link
Contributor Author

jwilm commented Apr 15, 2022

Thanks for the quick reply!

My concern is, like others pointed out in the spec issue, without a bound it can cause an OOM. We can add a parameter to config the number of concurrent sending tasks. If the parameter is set to 1 then disable the concurrent sending.

Makes sense; this would be fairly straightforward to add.

Another issue is based on the current model, the shutdown function will return after the last batch is exported. If we are exporting multiple batches concurrently we need to make sure they all finished exporting before returning from shutdown.

The shutdown "joining" logic makes sense; a bit embarrassed I didn't consider that up front. Also should be relatively simple to add.

Similar to force_flush function(We may need to define whether flush means here. Is it starting the exporting or finishing the exporting?)

My 2c: in the non-concurrent model it meant starting a request up to the batch size. In the concurrent model, it's still possible to have pending items in queue up to the timeout; to me it still makes sense to consider flushing the start of an export.

Jaeger is a little complex. I think the &mut borrow is caused by emit_batch method of generated thrift agent. We may need to propagate futures from uploaders. Need to take a closer look to figure out what's the best approach here.

That's right. The generated thrift agent has some mutable methods for working with the io streams and some sort of frame counter. It's by no means untenable but certainly more work than the other exports (hence the draft for discussion).

PS. For some reason, your link to the spec issue is not working. open-telemetry/opentelemetry-specification#2434.

Fixed, thank you.

Would it be premature for me to continue with the suggested fixes?

@jtescher
Copy link
Member

@jwilm these all sound like good improvements to me 👍

@TommyCpp
Copy link
Contributor

TommyCpp commented Apr 17, 2022

I think we can implement the proposed changes. One thing is your branch seems stale and I'd suggest rebasing on the latest to include some critical changes like we split opentelemetry into opentelemetry-sdk and opentelemetry-api. It might be easier than merge upstream after your changes

@jwilm
Copy link
Contributor Author

jwilm commented Apr 20, 2022

I rebased the existing work on main, and I'm starting on the proposed changes (will track on OP).

In the mean time, I would appreciate some input on the Jaeger exporter. I noticed that stackdriver exporter spawns a separate task for the actual export work, and the SpanExporter impl simply sends the batch via channel. Perhaps the same approach would be suitable for Jaeger?

@jwilm
Copy link
Contributor Author

jwilm commented Apr 21, 2022

I believe all the requested changes have been implemented, save for the Jaeger exporter.

Considering the implementation path there, I think a task queue system could be utilized where the Jaeger exporter will only export a single batch concurrently (regardless of BSP settings). This seems like the path of least resistance to landing this PR, but it also means that the Jaeger exporter doesn't actually gain any concurrency. Perhaps this is OK for now and Jaeger concurrency can be a future enhancement?

Separately, I took a closer look at the stackdriver exporter, and I think it could easily take advantage of the BSP concurrency management instead of rolling its own. Let me know if you'd like that done.

Waiting on your feedback before proceeding.

Thanks!

@jwilm jwilm force-pushed the concurrent-exports branch 2 times, most recently from 385caa2 to 6f75887 Compare April 21, 2022 19:16

Ok(builder.build())
}
// pub fn build_simple(mut self) -> Result<TracerProvider, TraceError> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These build simple / sync methods are the last blocker for the jaeger implementation. In order to support the non-pipelined I/O resources like the Agent uploaders, the Exporter and Uploaders were decoupled via a channel, and the uploaders now run on a dedicated task. However, this meant that Exporter::new has to return a task to spawn as well, similar to the stackdriver exporter. This was problematic with this build_simple, etc. methods since there's no runtime available on which to spawn that task.

I'll need feedback on whether we can simply thread the Runtime through here and whether this design is acceptable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the commit to thread the runtime through, but I don't have enough context in the project to know if this is the right approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the hard work! Sorry didn't get much time yesterday to take a look. Will try to take a look tomorrow or over the weekend. We do want to keep install_simple without runtime parameter just so that you can work with opentelemetry without a runtime. But I need a closer look to see it's possible to do it under the new exporter APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just that method in particular, or is the goal that the opentelemetry library can be used without a runtime? If it's just that method, we can probably figure something out using the cfg(feature) attrs internally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in general we want to allow people to use it without a runtime. As a util library, we shouldn't assume the runtime and deny certain use cases.

@jwilm jwilm force-pushed the concurrent-exports branch 2 times, most recently from 2d1df86 to 0617ca8 Compare April 21, 2022 20:07
@jwilm jwilm marked this pull request as ready for review April 21, 2022 20:48
@jwilm jwilm requested a review from a team as a code owner April 21, 2022 20:48
@codecov
Copy link

codecov bot commented Apr 22, 2022

Codecov Report

Merging #781 (b647016) into main (02e15b2) will decrease coverage by 0.4%.
The diff coverage is 39.3%.

@@           Coverage Diff           @@
##            main    #781     +/-   ##
=======================================
- Coverage   70.2%   69.8%   -0.5%     
=======================================
  Files        109     109             
  Lines       8963    9045     +82     
=======================================
+ Hits        6293    6314     +21     
- Misses      2670    2731     +61     
Impacted Files Coverage Δ
opentelemetry-datadog/src/exporter/mod.rs 22.4% <0.0%> (-2.0%) ⬇️
opentelemetry-jaeger/src/lib.rs 93.0% <ø> (ø)
opentelemetry-otlp/src/span.rs 0.0% <0.0%> (ø)
opentelemetry-sdk/src/export/trace/mod.rs 100.0% <ø> (ø)
opentelemetry-sdk/src/export/trace/stdout.rs 9.0% <0.0%> (-0.6%) ⬇️
opentelemetry-zipkin/src/exporter/mod.rs 0.0% <0.0%> (ø)
opentelemetry-zipkin/src/exporter/uploader.rs 0.0% <0.0%> (ø)
opentelemetry-jaeger/src/exporter/mod.rs 51.0% <23.6%> (-4.6%) ⬇️
opentelemetry-sdk/src/runtime.rs 69.7% <50.0%> (ø)
opentelemetry-sdk/src/trace/span_processor.rs 79.8% <72.1%> (-2.2%) ⬇️
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 02e15b2...b647016. Read the comment docs.

@TommyCpp
Copy link
Contributor

TommyCpp commented Apr 22, 2022

Considering the implementation path there, I think a task queue system could be utilized where the Jaeger exporter will only export a single batch concurrently (regardless of BSP settings). This seems like the path of least resistance to landing this PR, but it also means that the Jaeger exporter doesn't actually gain any concurrency. Perhaps this is OK for now and Jaeger concurrency can be a future enhancement?

I think the easiest way is to wrap the uploader using mutex? I think the ideal solution is to replace the Uploader trait with two enum/trait that separates the thread-safe uploader(which supports concurrent exporting) and non-thread-safe one(which need to export sequentially(.

Separately, I took a closer look at the stackdriver exporter, and I think it could easily take advantage of the BSP concurrency management instead of rolling its own. Let me know if you'd like that done.

I think it makes sense. Curious what's your thoughts @djc. Also, we can open a separate PR to address this.

@djc
Copy link
Contributor

djc commented Apr 22, 2022

Separately, I took a closer look at the stackdriver exporter, and I think it could easily take advantage of the BSP concurrency management instead of rolling its own. Let me know if you'd like that done.

Sounds good, are you actually going to be using stackdriver or are you just cleaning stuff up here? :)

@jwilm
Copy link
Contributor Author

jwilm commented Apr 23, 2022

I think the easiest way is to wrap the uploader using mutex?

I considered that, but it has some downsides including loss of ordering of uploads. The task queue seemed cleaner, but it did necessitate spawning a future somehow. I suppose another option is spawning a thread for the non-async/await uploader(s).

I think the ideal solution is to replace the Uploader trait

Would it be possible to expose two different exporters from Jaeger? This might open up the design space a bit more.

Sounds good, are you actually going to be using stackdriver or are you just cleaning stuff up here? :)

Just an offer to clean up :)

@TommyCpp
Copy link
Contributor

TommyCpp commented Apr 23, 2022

but it has some downsides including loss of ordering of uploads

I think if we allow concurrent exporting. We won't be able to guarantee the strict order of spans. i. e multiple requests in flight could finish in arbitrary order. Need to double-check how Jaeger or other backend handle this but I don't think it is a no go. I will report back what I found

I suppose another option is spawning a thread for the non-async/await uploader(s).

Yeah I think although it is against the best practice when working with async runtime we can try this.

@djc
Copy link
Contributor

djc commented Apr 25, 2022

Just an offer to clean up :)

Would be great, thanks!

@TommyCpp
Copy link
Contributor

TommyCpp commented May 1, 2022

Need to double-check how Jaeger or other backend handle this but I don't think it is a no go. I will report back what I found

Tested around Jaeger and it seems OK for us in ingest spans out of order. I am OK if we spawn a thread or use a mutex now. As long as we don't make an API change we can work on improving the internals in the following PR.

Would it be possible to expose two different exporters from Jaeger

I think it's possible but probably will be better to address it in a different PR :)

Thanks again for the wonderful work

@jwilm
Copy link
Contributor Author

jwilm commented May 5, 2022

I think there's a path to keeping the task based approach if the only blocker is the install_simple method not taking a runtime. Given that the core library requires some sort of async runtime, we could potentially detect that based on cargo features and insert the runtime internally rather than requiring it as an argument in the public API.

What do you think?
Thanks!

@@ -38,6 +38,7 @@ http = "0.2"
reqwest = { version = "0.11", optional = true, default-features = false }
surf = { version = "2.0", optional = true, default-features = false }
thiserror = { version = "1.0"}
futures = "0.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we save on dependencies by using futures-util (or even futures-core) for most of these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the exception of the current Jaeger exporter implementation, we should be able to get away with using only futures-util. I'll make that change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, and maybe in some cases even futures-core (at least for Zipkin)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in the latest commit.

jwilm added 3 commits May 5, 2022 15:04
Applications generating significant span volume can end up dropping data
due to the synchronous export step. According to the opentelemetry spec,

    This function will never be called concurrently for the same exporter
    instance. It can be called again only after the current call returns.

However, it does not place a restriction on concurrent I/O or anything
of that nature. There is an [ongoing discussion] about tweaking the
language to make this more clear.

With that in mind, this commit makes the exporters return a future that
can be spawned concurrently. Unfortunately, this means that the
`export()` method can no longer be async while taking &mut self. The
latter is desirable to enforce the no concurrent calls line of the spec,
so the choice is made here to return a future instead with the lifetime
decoupled from self. This resulted in a bit of additional verbosity, but
for the most part the async code can still be shoved into an async fn
for the ergonomics.

The main exception to this is the `jaeger` exporter which internally
requires a bunch of mutable references. I plan to discuss with the
opentelemetry team the overall goal of this PR and get buy-in before
making more invasive changes to support this in the jaeger exporter.

[ongoing discussion]: open-telemetry/opentelemetry-specification#2434
Prior, export tasks were run in "fire and forget" mode with
runtime::spawn. SpanProcessor now manages tasks directly using
FuturesUnordered. This enables limiting overall concurrency (and thus
memory footprint). Additionally, flush and shutdown logic now spawn an
additional task for any unexported spans and wait on _all_ outstanding
tasks to complete before returning.
Users may desire to control the level of export concurrency in the batch
span processor. There are two special values:

    max_concurrent_exports = 0: no bound on concurrency
    max_concurrent_exports = 1: no concurrency, makes everything
    synchronous on the messaging task.
Key points
- decouple exporter from uploaders via channel and spawned task
- some uploaders are a shared I/O resource and cannot be multiplexed
    - necessitates a task queue
    - eg, HttpClient will spawn many I/O tasks internally, AgentUploader
      is a single I/O resource. Different level of abstraction.
- Synchronous API not supported without a Runtime argument. I updated
  the API to thread one through, but maybe this is undesirable. I'm also
  exploiting the fact in the Actix examples that it uses Tokio under the
  hood to pass through the Tokio runtime token.
- Tests pass save for a couple of flakey environment ones which is
  likely a race condition.
@jwilm
Copy link
Contributor Author

jwilm commented May 5, 2022

This should be ready for another review. The Jaeger API is as it was before, and a thread is used internally to handle the task execution.

As a future work, I think it would go a long way to have two different exporters for Jaeger: one which does the exports directly with no thread and synchronously, and then the more production oriented asynchronous and concurrent exporter.

Copy link
Contributor

@TommyCpp TommyCpp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks really great and should be a great improvement 👍 . We can look into Jaeger more and optimize the jaeger exporter in a following up PR.

opentelemetry-sdk/src/trace/span_processor.rs Outdated Show resolved Hide resolved
@@ -2,7 +2,7 @@

set -eu

cargo test --all "$@"
cargo test --all "$@" -- --test-threads=1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why we need the test thread to be 1 here? Does running them in parallel cause some issues?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I noticed there were some flakey tests due to modifying the environment and colliding with each other.

@jwilm jwilm requested a review from TommyCpp May 6, 2022 18:38
scripts/lint.sh Outdated
cargo_feature opentelemetry-jaeger "wasm_collector_client"
cargo_feature opentelemetry-jaeger "collector_client, wasm_collector_client"
cargo_feature opentelemetry-jaeger "default"
cargo_feature opentelemetry-jaeger "full"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice to have the full feature, but we probably want to keep those separate tests to make sure those feature can also work on their own

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

@TommyCpp TommyCpp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work 👍 Just one nit and fixing the lint/test I think we are good to merge

jwilm added 6 commits May 10, 2022 18:13
The minimal necessary futures library (core, util, futures proper) is
now used in all packages touched by the concurrent exporters work.
To keep the API _actually_ simple, we now leverage a thread to run the
jaeger exporter internals.
Per PR feedback, the default should match the previous behavior of 1
batch at a time.
This finishes the remaining TODOs on the concurrent-exports branch. The
major change included here adds shutdown functionality to the jaeger
exporter which ensures the exporter has finished its tasks before
exiting.
This was erroneously committed.
OTEL_BSP_MAX_CONCURRENT_EXPORTS may now be specified in the environment
to configure the number of max concurrent exports. This configurable now
has parity with the other options of the span_processor.
Copy link
Contributor

@TommyCpp TommyCpp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🎉

@TommyCpp
Copy link
Contributor

Any other suggestions or do you think it's good to merge? cc @jtescher @djc

@djc
Copy link
Contributor

djc commented May 13, 2022

I don't have bandwidth to review this in detail, sorry.

@jwilm
Copy link
Contributor Author

jwilm commented May 16, 2022

Anything I can do to help land this?

@TommyCpp TommyCpp merged commit 7534891 into open-telemetry:main May 17, 2022
@jwilm
Copy link
Contributor Author

jwilm commented May 20, 2022

🎉 thanks for the merge!

Are there plans to publish a new release to crates.io anytime soon? We would love to get off our git dependencies.

@TommyCpp
Copy link
Contributor

I hope we can release a new version of 0.18 sometime soon. You can track the process in #779

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants