Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement spans exporting for ClickHouse storage in Jaeger V2 #4941

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 20 additions & 3 deletions cmd/jaeger/internal/exporters/storageexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

type storageExporter struct {
config *Config
logger *zap.Logger
spanWriter spanstore.Writer
clickhouse bool
// Separate traces exporting function for ClickHouse storage.
// This is temporary until we have v2 storage API.
chExportTraces func(ctx context.Context, td ptrace.Traces) error
}

func newExporter(config *Config, otel component.TelemetrySettings) *storageExporter {
Expand All @@ -30,14 +35,22 @@ func newExporter(config *Config, otel component.TelemetrySettings) *storageExpor
}
}

func (exp *storageExporter) start(_ context.Context, host component.Host) error {
func (exp *storageExporter) start(ctx context.Context, host component.Host) error {
f, err := jaegerstorage.GetStorageFactory(exp.config.TraceStorage, host)
if err != nil {
return fmt.Errorf("cannot find storage factory: %w", err)
}

if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
switch t := f.(type) {
case *ch.Factory:
haanhvu marked this conversation as resolved.
Show resolved Hide resolved
exp.clickhouse = true
t.CreateSpansTable(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't we do this inside clickhouse.NewFactory? storageexporter doesn't need to know about internal details of each storage implementation

exp.chExportTraces = t.ExportSpans
default:
exp.clickhouse = false
if exp.spanWriter, err = f.CreateSpanWriter(); err != nil {
return fmt.Errorf("cannot create span writer: %w", err)
}
}

return nil
Expand All @@ -49,6 +62,10 @@ func (exp *storageExporter) close(_ context.Context) error {
}

func (exp *storageExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
if exp.clickhouse {
return exp.chExportTraces(ctx, td)
}

batches, err := otlp2jaeger.ProtoFromTraces(td)
if err != nil {
return fmt.Errorf("cannot transform OTLP traces to Jaeger format: %w", err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/exporters/storageexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func createTracesExporter(ctx context.Context, set exporter.CreateSettings, conf
// Disable Timeout/RetryOnFailure and SendingQueue
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(exporterhelper.RetrySettings{Enabled: false}),
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: false}),
// Enable queue settings for Clickhouse only
exporterhelper.WithQueue(exporterhelper.QueueSettings{Enabled: ex.clickhouse}),
exporterhelper.WithStart(ex.start),
exporterhelper.WithShutdown(ex.close),
)
Expand Down
6 changes: 2 additions & 4 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package jaegerstorage

import (
memoryCfg "github.com/jaegertracing/jaeger/pkg/memory/config"
ch "github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
)

// Config has the configuration for jaeger-query,
Expand All @@ -13,9 +14,6 @@ type Config struct {
// TODO add other storage types here
// TODO how will this work with 3rd party storage implementations?
// Option: instead of looking for specific name, check interface.
}

type MemoryStorage struct {
Name string `mapstructure:"name"`
memoryCfg.Configuration
ClickHouse map[string]ch.Config `mapstructure:"clickhouse"`
}
9 changes: 9 additions & 0 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/clickhouse"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage"
)
Expand Down Expand Up @@ -71,6 +72,14 @@ func (s *storageExt) Start(ctx context.Context, host component.Host) error {
)
}
// TODO add support for other backends

for name, chCfg := range s.config.ClickHouse {
if _, ok := s.factories[name]; ok {
return fmt.Errorf("duplicate clickhouse storage name %s", name)
}
s.factories[name] = clickhouse.NewFactory(ctx, chCfg, s.logger.With(zap.String("storage_name", name)))
}

return nil
}

Expand Down
51 changes: 51 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
service:
extensions: [jaeger_storage, jaeger_query]
pipelines:
traces:
receivers: [otlp, jaeger, zipkin]
processors: [batch]
exporters: [jaeger_storage_exporter]

extensions:
# health_check:
# pprof:
# endpoint: 0.0.0.0:1777
# zpages:
# endpoint: 0.0.0.0:55679

jaeger_query:
trace_storage: ch_store
ui_config: ./cmd/jaeger/config-ui.json

jaeger_storage:
memory:
memstore:
max_traces: 100000
memstore_archive:
max_traces: 100000
clickhouse:
ch_store:
endpoint: tcp://127.0.0.1:9000?dial_timeout=10s&compress=lz4
spans_table_name: jaeger_spans

receivers:
otlp:
protocols:
grpc:
http:

jaeger:
protocols:
grpc:
thrift_binary:
thrift_compact:
thrift_http:

zipkin:

processors:
batch:

exporters:
jaeger_storage_exporter:
trace_storage: ch_store
10 changes: 9 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ require (

require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/ClickHouse/ch-go v0.58.2 // indirect
github.com/ClickHouse/clickhouse-go/v2 v2.15.0 // indirect
github.com/IBM/sarama v1.42.1 // indirect
github.com/VividCortex/gohistogram v1.0.0 // indirect
github.com/andybalholm/brotli v1.0.6 // indirect
github.com/aws/aws-sdk-go v1.48.14 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand All @@ -93,13 +96,15 @@ require (
github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/elastic/elastic-transport-go/v8 v8.3.0 // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
Expand Down Expand Up @@ -157,6 +162,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.91.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/openzipkin/zipkin-go v0.4.2 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
Expand All @@ -172,8 +178,10 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.11 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand Down