From 3cf35bdad6f9c64c54f1774cc57a50ebdc372939 Mon Sep 17 00:00:00 2001 From: Chao Weng <19381524+sincejune@users.noreply.github.com> Date: Sat, 29 Jan 2022 00:07:21 +0800 Subject: [PATCH] Add env support for batch span processor (#2515) * Add env support for batch span processor * Update changelog * lint --- CHANGELOG.md | 1 + sdk/trace/batch_span_processor.go | 23 +++++-- sdk/trace/batch_span_processor_test.go | 83 ++++++++++++++++++++++++++ sdk/trace/env.go | 59 ++++++++++++++++++ sdk/trace/env_test.go | 60 +++++++++++++++++++ 5 files changed, 220 insertions(+), 6 deletions(-) create mode 100644 sdk/trace/env.go create mode 100644 sdk/trace/env_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4040dd1b32b..fb91daf7486 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Support `OTEL_EXPORTER_ZIPKIN_ENDPOINT` env to specify zipkin collector endpoint (#2490) - Log the configuration of TracerProviders, and Tracers for debugging. To enable use a logger with Verbosity (V level) >=1 +- Added environment variables for: `OTEL_BSP_SCHEDULE_DELAY`, `OTEL_BSP_EXPORT_TIMEOUT`, `OTEL_BSP_MAX_QUEUE_SIZE` and `OTEL_BSP_MAX_EXPORT_BATCH_SIZE` (#2515) ### Changed diff --git a/sdk/trace/batch_span_processor.go b/sdk/trace/batch_span_processor.go index dda62c452c6..57f346aa08b 100644 --- a/sdk/trace/batch_span_processor.go +++ b/sdk/trace/batch_span_processor.go @@ -29,8 +29,8 @@ import ( // Defaults for BatchSpanProcessorOptions. const ( DefaultMaxQueueSize = 2048 - DefaultBatchTimeout = 5000 * time.Millisecond - DefaultExportTimeout = 30000 * time.Millisecond + DefaultScheduleDelay = 5000 + DefaultExportTimeout = 30000 DefaultMaxExportBatchSize = 512 ) @@ -89,11 +89,22 @@ var _ SpanProcessor = (*batchSpanProcessor)(nil) // // If the exporter is nil, the span processor will preform no action. func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor { + maxQueueSize := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, DefaultMaxQueueSize) + maxExportBatchSize := intEnvOr(EnvBatchSpanProcessorMaxExportBatchSize, DefaultMaxExportBatchSize) + + if maxExportBatchSize > maxQueueSize { + if DefaultMaxExportBatchSize > maxQueueSize { + maxExportBatchSize = maxQueueSize + } else { + maxExportBatchSize = DefaultMaxExportBatchSize + } + } + o := BatchSpanProcessorOptions{ - BatchTimeout: DefaultBatchTimeout, - ExportTimeout: DefaultExportTimeout, - MaxQueueSize: DefaultMaxQueueSize, - MaxExportBatchSize: DefaultMaxExportBatchSize, + BatchTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorScheduleDelay, DefaultScheduleDelay)) * time.Millisecond, + ExportTimeout: time.Duration(intEnvOr(EnvBatchSpanProcessorExportTimeout, DefaultExportTimeout)) * time.Millisecond, + MaxQueueSize: maxQueueSize, + MaxExportBatchSize: maxExportBatchSize, } for _, opt := range options { opt(&o) diff --git a/sdk/trace/batch_span_processor_test.go b/sdk/trace/batch_span_processor_test.go index f66c6bd1b43..f361b535d0f 100644 --- a/sdk/trace/batch_span_processor_test.go +++ b/sdk/trace/batch_span_processor_test.go @@ -19,10 +19,13 @@ import ( "encoding/binary" "errors" "fmt" + "os" "sync" "testing" "time" + ottest "go.opentelemetry.io/otel/internal/internaltest" + "github.com/go-logr/logr/funcr" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -115,6 +118,7 @@ type testOption struct { wantBatchCount int genNumSpans int parallel bool + envs map[string]string } func TestNewBatchSpanProcessorWithOptions(t *testing.T) { @@ -221,6 +225,85 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) { } } +func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) { + options := []testOption{ + { + name: "BatchSpanProcessorEnvOptions - Basic", + wantNumSpans: 2053, + wantBatchCount: 1, + genNumSpans: 2053, + envs: map[string]string{ + sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000", + sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "5000", + }, + }, + { + name: "BatchSpanProcessorEnvOptions - A lager max export batch size than queue size", + wantNumSpans: 2053, + wantBatchCount: 4, + genNumSpans: 2053, + envs: map[string]string{ + sdktrace.EnvBatchSpanProcessorMaxQueueSize: "5000", + sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000", + }, + }, + { + name: "BatchSpanProcessorEnvOptions - A lage max export batch size with a small queue size", + wantNumSpans: 2053, + wantBatchCount: 42, + genNumSpans: 2053, + envs: map[string]string{ + sdktrace.EnvBatchSpanProcessorMaxQueueSize: "50", + sdktrace.EnvBatchSpanProcessorMaxExportBatchSize: "10000", + }, + }, + } + + envStore := ottest.NewEnvStore() + envStore.Record(sdktrace.EnvBatchSpanProcessorScheduleDelay) + envStore.Record(sdktrace.EnvBatchSpanProcessorExportTimeout) + envStore.Record(sdktrace.EnvBatchSpanProcessorMaxQueueSize) + envStore.Record(sdktrace.EnvBatchSpanProcessorMaxExportBatchSize) + + defer func() { + require.NoError(t, envStore.Restore()) + }() + + for _, option := range options { + t.Run(option.name, func(t *testing.T) { + for k, v := range option.envs { + require.NoError(t, os.Setenv(k, v)) + } + + te := testBatchExporter{} + tp := basicTracerProvider(t) + ssp := createAndRegisterBatchSP(option, &te) + if ssp == nil { + t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name) + } + tp.RegisterSpanProcessor(ssp) + tr := tp.Tracer("BatchSpanProcessorWithOptions") + + generateSpan(t, option.parallel, tr, option) + + tp.UnregisterSpanProcessor(ssp) + + gotNumOfSpans := te.len() + if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans { + t.Errorf("number of exported span: got %+v, want %+v\n", + gotNumOfSpans, option.wantNumSpans) + } + + gotBatchCount := te.getBatchCount() + if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount { + t.Errorf("number batches: got %+v, want >= %+v\n", + gotBatchCount, option.wantBatchCount) + t.Errorf("Batches %v\n", te.sizes) + } + }) + } +} + type stuckExporter struct { testBatchExporter } diff --git a/sdk/trace/env.go b/sdk/trace/env.go new file mode 100644 index 00000000000..da7ea412e53 --- /dev/null +++ b/sdk/trace/env.go @@ -0,0 +1,59 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace // import "go.opentelemetry.io/otel/sdk/trace" + +import ( + "os" + "strconv" + + "go.opentelemetry.io/otel/internal/global" +) + +// Environment variable names +const ( + // EnvBatchSpanProcessorScheduleDelay + // Delay interval between two consecutive exports. + // i.e. 5000 + EnvBatchSpanProcessorScheduleDelay = "OTEL_BSP_SCHEDULE_DELAY" + // EnvBatchSpanProcessorExportTimeout + // Maximum allowed time to export data. + // i.e. 3000 + EnvBatchSpanProcessorExportTimeout = "OTEL_BSP_EXPORT_TIMEOUT" + // EnvBatchSpanProcessorMaxQueueSize + // Maximum queue size + // i.e. 2048 + EnvBatchSpanProcessorMaxQueueSize = "OTEL_BSP_MAX_QUEUE_SIZE" + // EnvBatchSpanProcessorMaxExportBatchSize + // Maximum batch size + // Note: Must be less than or equal to EnvBatchSpanProcessorMaxQueueSize + // i.e. 512 + EnvBatchSpanProcessorMaxExportBatchSize = "OTEL_BSP_MAX_EXPORT_BATCH_SIZE" +) + +// intEnvOr returns an env variable's numeric value if it is exists (and valid) or the default if not +func intEnvOr(key string, defaultValue int) int { + value, ok := os.LookupEnv(key) + if !ok { + return defaultValue + } + + intValue, err := strconv.Atoi(value) + if err != nil { + global.Info("Got invalid value, number value expected.", key, value) + return defaultValue + } + + return intValue +} diff --git a/sdk/trace/env_test.go b/sdk/trace/env_test.go new file mode 100644 index 00000000000..be37eae5744 --- /dev/null +++ b/sdk/trace/env_test.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package trace + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ottest "go.opentelemetry.io/otel/internal/internaltest" +) + +func TestIntEnvOr(t *testing.T) { + testCases := []struct { + name string + envValue string + defaultValue int + expectedValue int + }{ + { + name: "IntEnvOrTest - Basic", + envValue: "2500", + defaultValue: 500, + expectedValue: 2500, + }, + { + name: "IntEnvOrTest - Invalid Number", + envValue: "localhost", + defaultValue: 500, + expectedValue: 500, + }, + } + + envStore := ottest.NewEnvStore() + envStore.Record(EnvBatchSpanProcessorMaxQueueSize) + defer func() { + require.NoError(t, envStore.Restore()) + }() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, os.Setenv(EnvBatchSpanProcessorMaxQueueSize, tc.envValue)) + actualValue := intEnvOr(EnvBatchSpanProcessorMaxQueueSize, tc.defaultValue) + assert.Equal(t, tc.expectedValue, actualValue) + }) + } +}