diff --git a/service/collector.go b/service/collector.go index 30081c48b38..5b06bf65aef 100644 --- a/service/collector.go +++ b/service/collector.go @@ -31,7 +31,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/ballastextension" "go.opentelemetry.io/collector/featuregate" - "go.opentelemetry.io/collector/service/internal/telemetrylogs" + "go.opentelemetry.io/collector/service/internal/grpclog" ) // State defines Collector's state. @@ -145,7 +145,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error { } if !col.set.SkipSettingGRPCLogger { - telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level) + grpclog.SetLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level) } if err = col.service.Start(ctx); err != nil { diff --git a/service/internal/grpclog/logger.go b/service/internal/grpclog/logger.go new file mode 100644 index 00000000000..a56c172b1c2 --- /dev/null +++ b/service/internal/grpclog/logger.go @@ -0,0 +1,31 @@ +package grpclog + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zapgrpc" + "google.golang.org/grpc/grpclog" +) + +// SetLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with +// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to +// INFO to avoid copious logging from grpc framework. +func SetLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger { + logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { + var c zapcore.Core + var err error + if loglevel == zapcore.InfoLevel { + loglevel = zapcore.WarnLevel + } + // NewIncreaseLevelCore errors only if the new log level is less than the initial core level. + c, err = zapcore.NewIncreaseLevelCore(core, loglevel) + // In case of an error changing the level, move on, this happens when using the NopCore + if err != nil { + c = core + } + return c.With([]zapcore.Field{zap.Bool("grpc_log", true)}) + }))) + + grpclog.SetLoggerV2(logger) + return logger +} diff --git a/service/internal/telemetrylogs/logger_test.go b/service/internal/grpclog/logger_test.go similarity index 81% rename from service/internal/telemetrylogs/logger_test.go rename to service/internal/grpclog/logger_test.go index 0e98dda6603..19cd3037dfa 100644 --- a/service/internal/telemetrylogs/logger_test.go +++ b/service/internal/grpclog/logger_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetrylogs +package grpclog import ( "testing" @@ -20,21 +20,19 @@ import ( "github.com/stretchr/testify/assert" "go.uber.org/zap" "go.uber.org/zap/zapcore" - - "go.opentelemetry.io/collector/service/telemetry" ) func TestGRPCLogger(t *testing.T) { tests := []struct { name string - cfg telemetry.LogsConfig + cfg zap.Config infoLogged bool warnLogged bool }{ { "collector_info_level_grpc_log_warn", - telemetry.LogsConfig{ - Level: zapcore.InfoLevel, + zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.InfoLevel), Encoding: "console", }, false, @@ -42,8 +40,8 @@ func TestGRPCLogger(t *testing.T) { }, { "collector_debug_level_grpc_log_debug", - telemetry.LogsConfig{ - Level: zapcore.DebugLevel, + zap.Config{ + Level: zap.NewAtomicLevelAt(zapcore.DebugLevel), Encoding: "console", }, true, @@ -51,9 +49,9 @@ func TestGRPCLogger(t *testing.T) { }, { "collector_warn_level_grpc_log_warn", - telemetry.LogsConfig{ + zap.Config{ Development: false, // this must set the grpc loggerV2 to loggerV2 - Level: zapcore.WarnLevel, + Level: zap.NewAtomicLevelAt(zapcore.WarnLevel), Encoding: "console", }, false, @@ -74,11 +72,11 @@ func TestGRPCLogger(t *testing.T) { }) // create new collector zap logger - logger, err := NewLogger(test.cfg, []zap.Option{hook}) + logger, err := test.cfg.Build(hook) assert.NoError(t, err) // create colGRPCLogger - glogger := SetColGRPCLogger(logger, test.cfg.Level) + glogger := SetLogger(logger, test.cfg.Level.Level()) assert.NotNil(t, glogger) glogger.Info(test.name) diff --git a/service/internal/telemetry/process_telemetry.go b/service/internal/proctelemetry/process_telemetry.go similarity index 98% rename from service/internal/telemetry/process_telemetry.go rename to service/internal/proctelemetry/process_telemetry.go index b6d0a9324f1..edb9a9d80ac 100644 --- a/service/internal/telemetry/process_telemetry.go +++ b/service/internal/proctelemetry/process_telemetry.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry" +package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry" import ( "os" diff --git a/service/internal/telemetry/process_telemetry_test.go b/service/internal/proctelemetry/process_telemetry_test.go similarity index 99% rename from service/internal/telemetry/process_telemetry_test.go rename to service/internal/proctelemetry/process_telemetry_test.go index 6864416016e..26435a78a9b 100644 --- a/service/internal/telemetry/process_telemetry_test.go +++ b/service/internal/proctelemetry/process_telemetry_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package telemetry +package proctelemetry import ( "testing" diff --git a/service/internal/telemetrylogs/logger.go b/service/internal/telemetrylogs/logger.go deleted file mode 100644 index ca1e1893a23..00000000000 --- a/service/internal/telemetrylogs/logger.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package telemetrylogs // import "go.opentelemetry.io/collector/service/internal/telemetrylogs" - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zapgrpc" - "google.golang.org/grpc/grpclog" - - "go.opentelemetry.io/collector/service/telemetry" -) - -func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, error) { - // Copied from NewProductionConfig. - zapCfg := &zap.Config{ - Level: zap.NewAtomicLevelAt(cfg.Level), - Development: cfg.Development, - Sampling: &zap.SamplingConfig{ - Initial: 100, - Thereafter: 100, - }, - Encoding: cfg.Encoding, - EncoderConfig: zap.NewProductionEncoderConfig(), - OutputPaths: cfg.OutputPaths, - ErrorOutputPaths: cfg.ErrorOutputPaths, - DisableCaller: cfg.DisableCaller, - DisableStacktrace: cfg.DisableStacktrace, - InitialFields: cfg.InitialFields, - } - - if zapCfg.Encoding == "console" { - // Human-readable timestamps for console format of logs. - zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - } - - logger, err := zapCfg.Build(options...) - if err != nil { - return nil, err - } - - return logger, nil -} - -// SetColGRPCLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with -// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to -// INFO to avoid copious logging from grpc framework. -func SetColGRPCLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger { - logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core { - var c zapcore.Core - var err error - if loglevel == zapcore.InfoLevel { - loglevel = zapcore.WarnLevel - } - // NewIncreaseLevelCore errors only if the new log level is less than the initial core level. - c, err = zapcore.NewIncreaseLevelCore(core, loglevel) - // In case of an error changing the level, move on, this happens when using the NopCore - if err != nil { - c = core - } - return c.With([]zapcore.Field{zap.Bool("grpc_log", true)}) - }))) - - grpclog.SetLoggerV2(logger) - return logger -} diff --git a/service/service.go b/service/service.go index 6304fc472f2..eca81b427cc 100644 --- a/service/service.go +++ b/service/service.go @@ -20,23 +20,22 @@ import ( "runtime" "go.opentelemetry.io/otel/metric" - sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.uber.org/multierr" "go.uber.org/zap" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/service/extensions" - "go.opentelemetry.io/collector/service/internal" "go.opentelemetry.io/collector/service/internal/pipelines" - "go.opentelemetry.io/collector/service/internal/telemetry" - "go.opentelemetry.io/collector/service/internal/telemetrylogs" + "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/telemetry" ) // service represents the implementation of a component.Host. type service struct { buildInfo component.BuildInfo config *Config + telemetry *telemetry.Telemetry telemetrySettings component.TelemetrySettings host *serviceHost telemetryInitializer *telemetryInitializer @@ -46,15 +45,6 @@ func newService(set *settings) (*service, error) { srv := &service{ buildInfo: set.BuildInfo, config: set.Config, - telemetrySettings: component.TelemetrySettings{ - Logger: zap.NewNop(), - TracerProvider: sdktrace.NewTracerProvider( - // needed for supporting the zpages extension - sdktrace.WithSampler(internal.AlwaysRecord()), - ), - MeterProvider: metric.NewNoopMeterProvider(), - MetricsLevel: set.Config.Telemetry.Metrics.Level, - }, host: &serviceHost{ factories: set.Factories, buildInfo: set.BuildInfo, @@ -64,9 +54,17 @@ func newService(set *settings) (*service, error) { } var err error - if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil { + srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{ + ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry) + if err != nil { return nil, fmt.Errorf("failed to get logger: %w", err) } + srv.telemetrySettings = component.TelemetrySettings{ + Logger: srv.telemetry.Logger(), + TracerProvider: srv.telemetry.TracerProvider(), + MeterProvider: metric.NewNoopMeterProvider(), + MetricsLevel: set.Config.Service.Telemetry.Metrics.Level, + } if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil { return nil, fmt.Errorf("failed to initialize telemetry: %w", err) @@ -128,7 +126,12 @@ func (srv *service) Shutdown(ctx context.Context) error { } srv.telemetrySettings.Logger.Info("Shutdown complete.") - // TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger. + + if err := srv.telemetry.Shutdown(ctx); err != nil { + errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err)) + } + + // TODO: Shutdown MeterProvider. return errs } @@ -161,7 +164,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error { if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" { // The process telemetry initialization requires the ballast size, which is available after the extensions are initialized. - if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { + if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil { return fmt.Errorf("failed to register process metrics: %w", err) } } diff --git a/service/internal/otel_trace_sampler.go b/service/telemetry/otel_trace_sampler.go similarity index 91% rename from service/internal/otel_trace_sampler.go rename to service/telemetry/otel_trace_sampler.go index c659a1e1794..813128db458 100644 --- a/service/internal/otel_trace_sampler.go +++ b/service/telemetry/otel_trace_sampler.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal // import "go.opentelemetry.io/collector/service/internal" +package telemetry // import "go.opentelemetry.io/collector/service/internal" import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" @@ -28,7 +28,7 @@ func (r recordSampler) Description() string { return "Always record sampler" } -func AlwaysRecord() sdktrace.Sampler { +func alwaysRecord() sdktrace.Sampler { rs := &recordSampler{} return sdktrace.ParentBased( rs, diff --git a/service/telemetry/telemetry.go b/service/telemetry/telemetry.go new file mode 100644 index 00000000000..d1823ca277e --- /dev/null +++ b/service/telemetry/telemetry.go @@ -0,0 +1,117 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package telemetry + +import ( + "context" + + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" + "go.uber.org/multierr" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type Telemetry struct { + logger *zap.Logger + tracerProvider *sdktrace.TracerProvider +} + +func (t *Telemetry) TracerProvider() trace.TracerProvider { + return t.tracerProvider +} + +func (t *Telemetry) Logger() *zap.Logger { + return t.logger +} + +func (t *Telemetry) Shutdown(ctx context.Context) error { + // TODO: Sync logger. + return multierr.Combine( + t.tracerProvider.Shutdown(ctx), + ) +} + +// Settings holds configuration for building Extensions. +type Settings struct { + ZapOptions []zap.Option +} + +// New creates a new Extensions from Config. +func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) { + logger, err := newLogger(cfg.Logs, set.ZapOptions) + if err != nil { + return nil, err + } + tp := sdktrace.NewTracerProvider( + // needed for supporting the zpages extension + sdktrace.WithSampler(alwaysRecord()), + ) + // TODO: Remove when https://github.com/open-telemetry/opentelemetry-go/pull/3268 released. + // For the moment, register and unregister so shutdown does not fail. + sp := &nopSpanProcessor{} + tp.RegisterSpanProcessor(sp) + tp.UnregisterSpanProcessor(sp) + return &Telemetry{ + logger: logger, + tracerProvider: tp, + }, nil +} + +func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) { + // Copied from NewProductionConfig. + zapCfg := &zap.Config{ + Level: zap.NewAtomicLevelAt(cfg.Level), + Development: cfg.Development, + Sampling: &zap.SamplingConfig{ + Initial: 100, + Thereafter: 100, + }, + Encoding: cfg.Encoding, + EncoderConfig: zap.NewProductionEncoderConfig(), + OutputPaths: cfg.OutputPaths, + ErrorOutputPaths: cfg.ErrorOutputPaths, + DisableCaller: cfg.DisableCaller, + DisableStacktrace: cfg.DisableStacktrace, + InitialFields: cfg.InitialFields, + } + + if zapCfg.Encoding == "console" { + // Human-readable timestamps for console format of logs. + zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + } + + logger, err := zapCfg.Build(options...) + if err != nil { + return nil, err + } + + return logger, nil +} + +type nopSpanProcessor struct { +} + +func (n nopSpanProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) {} + +func (n nopSpanProcessor) OnEnd(sdktrace.ReadOnlySpan) {} + +func (n nopSpanProcessor) Shutdown(context.Context) error { + return nil +} + +func (n nopSpanProcessor) ForceFlush(context.Context) error { + return nil +}