Skip to content

Commit

Permalink
feat(trace): jaeger remote sampler (#797)
Browse files Browse the repository at this point in the history
  • Loading branch information
TommyCpp committed Jul 4, 2022
1 parent 103ed3a commit 703059a
Show file tree
Hide file tree
Showing 18 changed files with 966 additions and 26 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -32,6 +32,7 @@ members = [
"examples/http",
"examples/hyper-prometheus",
"examples/tracing-grpc",
"examples/jaeger-remote-sampler",
"examples/zipkin",
"examples/multiple-span-processors",
"examples/zpages"
Expand Down
12 changes: 12 additions & 0 deletions examples/jaeger-remote-sampler/Cargo.toml
@@ -0,0 +1,12 @@
[package]
name = "jaeger-remote-sampler"
version = "0.1.0"
edition = "2018"

[dependencies]
opentelemetry-sdk = { path = "../../opentelemetry-sdk", features = ["rt-tokio", "jaeger_remote_sampler"] }
opentelemetry-api = { path = "../../opentelemetry-api" }
opentelemetry-http = { path = "../../opentelemetry-http", features = ["reqwest"] }
reqwest = "0.11.10"
tokio = { version = "1.18", features = ["macros", "rt-multi-thread"] }

47 changes: 47 additions & 0 deletions examples/jaeger-remote-sampler/README.md
@@ -0,0 +1,47 @@
# Jaeger remote sampler

When services generate too many spans. We need to sample some spans to save cost and speed up the queries.

Adaptive sampling works in the Jaeger collector by observing the spans received from services and recalculating sampling
probabilities for each service/endpoint combination to ensure that the volume is relatively constant.

## Setup

Start a jaeger collector and an opentelemetry collector locally using docker

```
docker-comopse run -d
```

It will allow you to

- query sampling strategies from jaeger collect at port 5578. `http://localhost:5778/sampling?service=foo`
- query sampling strategies from opentelemetry collector at port 5579. `http://localhost:5779/sampling?service=foo`

## Run the example

After start the jaeger remote sampling server successfully. We can run

`cargo run`

command to start the example, you should see something like only one span is printed out.

Looking at the example, you will notice we use `AlwaysOff` as our default sampler. It means before the SDK get the sampling strategy from remote server, no span will be sampled.

Once the SDK fetched the remote strategy, we will start a probability sampler internally. In this case, we set the probability to 1.0 for all spans. This is defined by

```
"service": "foo",
"type": "probabilistic",
"param": 1,
```

Feel free to tune the `param` and see if the probability of sampling changes.

## Strategies

The sampling strategies is defined in `srategies.json` files. It defines two set of strategies.

The first strategy is returned for `foo` service. The second strategy is catch all default strategy for all other
services.

26 changes: 26 additions & 0 deletions examples/jaeger-remote-sampler/docker-compose.yaml
@@ -0,0 +1,26 @@
version: "3"
services:

# jaeger collector
jaeger-all-in-one:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686"
- "14268"
- "14250"
- "5778:5778"
container_name: jaeger-collector
volumes:
- ./strategies.json:/etc/jaeger/custom_strategies.json
environment:
- SAMPLING_STRATEGIES_FILE=/etc/jaeger/custom_strategies.json

# opentelemetry collector
otel-collector:
image: otel/opentelemetry-collector:latest
command: [ "--config=/etc/otel-collector.yaml" ]
volumes:
- ./otel-collector.yaml:/etc/otel-collector.yaml
- ./strategies.json:/etc/strategies.json
ports:
- "5779:5778" # default jaeger remote sampling port
17 changes: 17 additions & 0 deletions examples/jaeger-remote-sampler/otel-collector.yaml
@@ -0,0 +1,17 @@
receivers:
jaeger:
protocols:
grpc:
remote_sampling:
host_endpoint: "0.0.0.0:5778" # default port
insecure: true
strategy_file: "/etc/strategies.json"

exporters:
logging:

service:
pipelines:
traces:
receivers: [ jaeger ]
exporters: [ logging ]
41 changes: 41 additions & 0 deletions examples/jaeger-remote-sampler/src/main.rs
@@ -0,0 +1,41 @@
use opentelemetry_api::global;
use opentelemetry_api::trace::Tracer;
use opentelemetry_sdk::export::trace::stdout::Exporter as StdoutExporter;
use opentelemetry_sdk::runtime;
use opentelemetry_sdk::trace::{Sampler, TracerProvider as SdkTracerProvider};
use std::time::Duration;

fn setup() {
let client = reqwest::Client::new();

let sampler = Sampler::jaeger_remote(runtime::Tokio, client, Sampler::AlwaysOff, "foo")
.with_endpoint("http://localhost:5778/sampling") // setup jaeger remote sampler endpoint
.with_update_interval(Duration::from_secs(5)) // will call jaeger sampling endpoint every 5 secs.
.build()
.unwrap();

let config = opentelemetry_sdk::trace::config().with_sampler(sampler);

let provider = SdkTracerProvider::builder()
.with_config(config)
.with_simple_exporter(StdoutExporter::new(std::io::stdout(), true))
.build();

global::set_tracer_provider(provider);
}

#[tokio::main]
async fn main() {
setup();
let tracer = global::tracer("test");

{
let _not_sampled_span = tracer.start("test");
}

tokio::time::sleep(Duration::from_secs(10)).await;

{
let _sampled_span = tracer.start("should_record");
}
}
37 changes: 37 additions & 0 deletions examples/jaeger-remote-sampler/strategies.json
@@ -0,0 +1,37 @@
{
"service_strategies": [
{
"service": "foo",
"type": "probabilistic",
"param": 1,
"operation_strategies": [
{
"operation": "op1",
"type": "probabilistic",
"param": 0.2
},
{
"operation": "op2",
"type": "probabilistic",
"param": 0.4
}
]
}
],
"default_strategy": {
"type": "probabilistic",
"param": 0.5,
"operation_strategies": [
{
"operation": "/health",
"type": "probabilistic",
"param": 0.0
},
{
"operation": "/metrics",
"type": "probabilistic",
"param": 0.0
}
]
}
}
1 change: 0 additions & 1 deletion opentelemetry-contrib/src/trace/propagator/mod.rs
Expand Up @@ -6,7 +6,6 @@
//! Currently, the following propagators are supported:
//!
//! * `binary_propagator`, propagating trace context in the binary format.
//! * `XrayPropagator`, propagating via AWS XRay protocol.
//!
//! This module also provides relative types for those propagators.
pub mod binary;
2 changes: 1 addition & 1 deletion opentelemetry-http/Cargo.toml
Expand Up @@ -13,6 +13,6 @@ async-trait = "0.1"
bytes = "1"
http = "0.2"
isahc = { version = "1.4", default-features = false, optional = true }
opentelemetry = { version = "0.17", path = "../opentelemetry", features = ["trace"] }
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api" }
reqwest = { version = "0.11", default-features = false, features = ["blocking"], optional = true }
surf = { version = "2.0", default-features = false, optional = true }
2 changes: 1 addition & 1 deletion opentelemetry-http/src/lib.rs
Expand Up @@ -6,7 +6,7 @@ pub use bytes::Bytes;
pub use http::{Request, Response};

use async_trait::async_trait;
use opentelemetry::{
use opentelemetry_api::{
propagation::{Extractor, Injector},
trace::TraceError,
};
Expand Down
9 changes: 7 additions & 2 deletions opentelemetry-sdk/Cargo.toml
Expand Up @@ -5,7 +5,9 @@ license = "Apache-2.0"
edition = "2018"

[dependencies]
async-std = { version = "= 1.10.0", features = ["unstable"], optional = true }
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api" }
opentelemetry-http = { version = "0.6.0", path = "../opentelemetry-http", optional = true }
async-std = { version = "1.6", features = ["unstable"], optional = true }
async-trait = { version = "0.1", optional = true }
crossbeam-channel = { version = "0.5", optional = true }
dashmap = { version = "=5.1.0", optional = true }
Expand All @@ -14,14 +16,16 @@ futures-channel = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "async-await-macro"] }
once_cell = "1.10"
opentelemetry-api = { version = "0.1", path = "../opentelemetry-api/" }
percent-encoding = { version = "2.0", optional = true }
pin-project = { version = "1.0.2", optional = true }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"], optional = true }
serde = { version = "1.0", features = ["derive", "rc"], optional = true }
serde_json = { version = "1", optional = true }
thiserror = "1"
url = { version = "2.2", optional = true }
tokio = { version = "1.0", default-features = false, features = ["rt", "time"], optional = true }
tokio-stream = { version = "0.1", optional = true }
http = { version = "0.2", optional = true }

[package.metadata.docs.rs]
all-features = true
Expand All @@ -35,6 +39,7 @@ rand_distr = "0.4.0"
[features]
default = ["trace"]
trace = ["opentelemetry-api/trace", "crossbeam-channel", "rand", "pin-project", "async-trait", "percent-encoding"]
jaeger_remote_sampler = ["trace", "opentelemetry-http", "http", "serde", "serde_json", "url"]
metrics = ["opentelemetry-api/metrics", "dashmap", "fnv"]
testing = ["opentelemetry-api/testing", "trace", "metrics", "rt-async-std", "rt-tokio", "rt-tokio-current-thread", "tokio/macros", "tokio/rt-multi-thread"]
rt-tokio = ["tokio", "tokio-stream"]
Expand Down
3 changes: 3 additions & 0 deletions opentelemetry-sdk/src/trace/mod.rs
Expand Up @@ -32,3 +32,6 @@ pub use span_processor::{
SpanProcessor,
};
pub use tracer::Tracer;

#[cfg(feature = "jaeger_remote_sampler")]
pub use sampler::JaegerRemoteSamplerBuilder;

0 comments on commit 703059a

Please sign in to comment.