diff --git a/.chloggen/move-tracing-init.yaml b/.chloggen/move-tracing-init.yaml new file mode 100644 index 00000000000..0a3c005e2d9 --- /dev/null +++ b/.chloggen/move-tracing-init.yaml @@ -0,0 +1,11 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service/telemetry + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Move logging and tracing initialization to service/telemetry" + +# One or more tracking issues or pull requests related to the change +issues: [5564] diff --git a/service/collector.go b/service/collector.go index 30081c48b38..5b06bf65aef 100644 --- a/service/collector.go +++ b/service/collector.go @@ -31,7 +31,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/ballastextension" "go.opentelemetry.io/collector/featuregate" - "go.opentelemetry.io/collector/service/internal/telemetrylogs" + "go.opentelemetry.io/collector/service/internal/grpclog" ) // State defines Collector's state. @@ -145,7 +145,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } if !col.set.SkipSettingGRPCLogger { - telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level) + grpclog.SetLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level) } if err = col.service.Start(ctx); err != nil { diff --git a/service/internal/telemetrylogs/logger.go b/service/internal/grpclog/logger.go similarity index 55% rename from service/internal/telemetrylogs/logger.go rename to service/internal/grpclog/logger.go index ca1e1893a23..e94d50747d2 100644 --- a/service/internal/telemetrylogs/logger.go +++ b/service/internal/grpclog/logger.go @@ -12,52 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetrylogs // import "go.opentelemetry.io/collector/service/internal/telemetrylogs" +package grpclog // import "go.opentelemetry.io/collector/service/internal/grpclog" import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zapgrpc" "google.golang.org/grpc/grpclog" - - "go.opentelemetry.io/collector/service/telemetry" ) -func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, error) { - // Copied from NewProductionConfig. - zapCfg := &zap.Config{ - Level: zap.NewAtomicLevelAt(cfg.Level), - Development: cfg.Development, - Sampling: &zap.SamplingConfig{ - Initial: 100, - Thereafter: 100, - }, - Encoding: cfg.Encoding, - EncoderConfig: zap.NewProductionEncoderConfig(), - OutputPaths: cfg.OutputPaths, - ErrorOutputPaths: cfg.ErrorOutputPaths, - DisableCaller: cfg.DisableCaller, - DisableStacktrace: cfg.DisableStacktrace, - InitialFields: cfg.InitialFields, - } - - if zapCfg.Encoding == "console" { - // Human-readable timestamps for console format of logs. - zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - } - - logger, err := zapCfg.Build(options...) - if err != nil { - return nil, err - } - - return logger, nil -} - -// SetColGRPCLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with +// SetLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with // exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to // INFO to avoid copious logging from grpc framework. -func SetColGRPCLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger { +func SetLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger { logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { var c zapcore.Core var err error diff --git a/service/internal/telemetrylogs/logger_test.go b/service/internal/grpclog/logger_test.go similarity index 81% rename from service/internal/telemetrylogs/logger_test.go rename to service/internal/grpclog/logger_test.go index 0e98dda6603..19cd3037dfa 100644 --- a/service/internal/telemetrylogs/logger_test.go +++ b/service/internal/grpclog/logger_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetrylogs +package grpclog import ( "testing" @@ -20,21 +20,19 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.uber.org/zap/zapcore" - - "go.opentelemetry.io/collector/service/telemetry" ) func TestGRPCLogger(t *testing.T) { tests := []struct { name string - cfg telemetry.LogsConfig + cfg zap.Config infoLogged bool warnLogged bool }{ { "collector_info_level_grpc_log_warn", - telemetry.LogsConfig{ - Level: zapcore.InfoLevel, + zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.InfoLevel), Encoding: "console", }, false, @@ -42,8 +40,8 @@ func TestGRPCLogger(t *testing.T) { }, { "collector_debug_level_grpc_log_debug", - telemetry.LogsConfig{ - Level: zapcore.DebugLevel, + zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), Encoding: "console", }, true, @@ -51,9 +49,9 @@ func TestGRPCLogger(t *testing.T) { }, { "collector_warn_level_grpc_log_warn", - telemetry.LogsConfig{ + zap.Config{ Development: false, // this must set the grpc loggerV2 to loggerV2 - Level: zapcore.WarnLevel, + Level: zap.NewAtomicLevelAt(zapcore.WarnLevel), Encoding: "console", }, false, @@ -74,11 +72,11 @@ func TestGRPCLogger(t *testing.T) { }) // create new collector zap logger - logger, err := NewLogger(test.cfg, []zap.Option{hook}) + logger, err := test.cfg.Build(hook) assert.NoError(t, err) // create colGRPCLogger - glogger := SetColGRPCLogger(logger, test.cfg.Level) + glogger := SetLogger(logger, test.cfg.Level.Level()) assert.NotNil(t, glogger) glogger.Info(test.name) diff --git a/service/internal/telemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go similarity index 98% rename from service/internal/telemetry/process_telemetry.go rename to service/internal/proctelemetry/process_telemetry.go index b6d0a9324f1..edb9a9d80ac 100644 --- a/service/internal/telemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry" +package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry" import ( "os" diff --git a/service/internal/telemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go similarity index 99% rename from service/internal/telemetry/process_telemetry_test.go rename to service/internal/proctelemetry/process_telemetry_test.go index 6864416016e..26435a78a9b 100644 --- a/service/internal/telemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetry +package proctelemetry import ( "testing" diff --git a/service/service.go b/service/service.go index 6304fc472f2..eca81b427cc 100644 --- a/service/service.go +++ b/service/service.go @@ -20,23 +20,22 @@ import ( "runtime" "go.opentelemetry.io/otel/metric" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/service/extensions" - "go.opentelemetry.io/collector/service/internal" "go.opentelemetry.io/collector/service/internal/pipelines" - "go.opentelemetry.io/collector/service/internal/telemetry" - "go.opentelemetry.io/collector/service/internal/telemetrylogs" + "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/telemetry" ) // service represents the implementation of a component.Host. type service struct { buildInfo component.BuildInfo config *Config + telemetry *telemetry.Telemetry telemetrySettings component.TelemetrySettings host *serviceHost telemetryInitializer *telemetryInitializer @@ -46,15 +45,6 @@ func newService(set *settings) (*service, error) { srv := &service{ buildInfo: set.BuildInfo, config: set.Config, - telemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: sdktrace.NewTracerProvider( - // needed for supporting the zpages extension - sdktrace.WithSampler(internal.AlwaysRecord()), - ), - MeterProvider: metric.NewNoopMeterProvider(), - MetricsLevel: set.Config.Telemetry.Metrics.Level, - }, host: &serviceHost{ factories: set.Factories, buildInfo: set.BuildInfo, @@ -64,9 +54,17 @@ func newService(set *settings) (*service, error) { } var err error - if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil { + srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{ + ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry) + if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } + srv.telemetrySettings = component.TelemetrySettings{ + Logger: srv.telemetry.Logger(), + TracerProvider: srv.telemetry.TracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + MetricsLevel: set.Config.Service.Telemetry.Metrics.Level, + } if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil { return nil, fmt.Errorf("failed to initialize telemetry: %w", err) @@ -128,7 +126,12 @@ func (srv *service) Shutdown(ctx context.Context) error { } srv.telemetrySettings.Logger.Info("Shutdown complete.") - // TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger. + + if err := srv.telemetry.Shutdown(ctx); err != nil { + errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err)) + } + + // TODO: Shutdown MeterProvider. return errs } @@ -161,7 +164,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error { if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { return fmt.Errorf("failed to register process metrics: %w", err) } } diff --git a/service/internal/otel_trace_sampler.go b/service/telemetry/otel_trace_sampler.go similarity index 91% rename from service/internal/otel_trace_sampler.go rename to service/telemetry/otel_trace_sampler.go index c659a1e1794..f8e6f0716fc 100644 --- a/service/internal/otel_trace_sampler.go +++ b/service/telemetry/otel_trace_sampler.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal // import "go.opentelemetry.io/collector/service/internal" +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -28,7 +28,7 @@ func (r recordSampler) Description() string { return "Always record sampler" } -func AlwaysRecord() sdktrace.Sampler { +func alwaysRecord() sdktrace.Sampler { rs := &recordSampler{} return sdktrace.ParentBased( rs, diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go new file mode 100644 index 00000000000..7aba6986309 --- /dev/null +++ b/service/telemetry/telemetry.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry // import "go.opentelemetry.io/collector/service/telemetry" + +import ( + "context" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type Telemetry struct { + logger *zap.Logger + tracerProvider *sdktrace.TracerProvider +} + +func (t *Telemetry) TracerProvider() trace.TracerProvider { + return t.tracerProvider +} + +func (t *Telemetry) Logger() *zap.Logger { + return t.logger +} + +func (t *Telemetry) Shutdown(ctx context.Context) error { + // TODO: Sync logger. + return multierr.Combine( + t.tracerProvider.Shutdown(ctx), + ) +} + +// Settings holds configuration for building Telemetry. +type Settings struct { + ZapOptions []zap.Option +} + +// New creates a new Telemetry from Config. +func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) { + logger, err := newLogger(cfg.Logs, set.ZapOptions) + if err != nil { + return nil, err + } + tp := sdktrace.NewTracerProvider( + // needed for supporting the zpages extension + sdktrace.WithSampler(alwaysRecord()), + ) + // TODO: Remove when https://github.com/open-telemetry/opentelemetry-go/pull/3268 released. + // For the moment, register and unregister so shutdown does not fail. + sp := &nopSpanProcessor{} + tp.RegisterSpanProcessor(sp) + tp.UnregisterSpanProcessor(sp) + return &Telemetry{ + logger: logger, + tracerProvider: tp, + }, nil +} + +func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { + // Copied from NewProductionConfig. + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(cfg.Level), + Development: cfg.Development, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: cfg.Encoding, + EncoderConfig: zap.NewProductionEncoderConfig(), + OutputPaths: cfg.OutputPaths, + ErrorOutputPaths: cfg.ErrorOutputPaths, + DisableCaller: cfg.DisableCaller, + DisableStacktrace: cfg.DisableStacktrace, + InitialFields: cfg.InitialFields, + } + + if zapCfg.Encoding == "console" { + // Human-readable timestamps for console format of logs. + zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + } + + logger, err := zapCfg.Build(options...) + if err != nil { + return nil, err + } + + return logger, nil +} + +type nopSpanProcessor struct { +} + +func (n nopSpanProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) {} + +func (n nopSpanProcessor) OnEnd(sdktrace.ReadOnlySpan) {} + +func (n nopSpanProcessor) Shutdown(context.Context) error { + return nil +} + +func (n nopSpanProcessor) ForceFlush(context.Context) error { + return nil +}