Skip to content

Commit

Permalink
Start moving telemetry initialization to service/telemetry (#6275)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan <bogdandrutu@gmail.com>

Signed-off-by: Bogdan <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 13, 2022
1 parent f24a6ac commit dfb21cf
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 70 deletions.
11 changes: 11 additions & 0 deletions .chloggen/move-tracing-init.yaml
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/telemetry

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Move logging and tracing initialization to service/telemetry"

# One or more tracking issues or pull requests related to the change
issues: [5564]
4 changes: 2 additions & 2 deletions service/collector.go
Expand Up @@ -31,7 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/grpclog"
)

// State defines Collector's state.
Expand Down Expand Up @@ -145,7 +145,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
grpclog.SetLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand Down
Expand Up @@ -12,52 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs // import "go.opentelemetry.io/collector/service/internal/telemetrylogs"
package grpclog // import "go.opentelemetry.io/collector/service/internal/grpclog"

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc/grpclog"

"go.opentelemetry.io/collector/service/telemetry"
)

func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
ErrorOutputPaths: cfg.ErrorOutputPaths,
DisableCaller: cfg.DisableCaller,
DisableStacktrace: cfg.DisableStacktrace,
InitialFields: cfg.InitialFields,
}

if zapCfg.Encoding == "console" {
// Human-readable timestamps for console format of logs.
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}

logger, err := zapCfg.Build(options...)
if err != nil {
return nil, err
}

return logger, nil
}

// SetColGRPCLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// SetLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to
// INFO to avoid copious logging from grpc framework.
func SetColGRPCLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
func SetLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
var c zapcore.Core
var err error
Expand Down
Expand Up @@ -12,48 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs
package grpclog

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/service/telemetry"
)

func TestGRPCLogger(t *testing.T) {
tests := []struct {
name string
cfg telemetry.LogsConfig
cfg zap.Config
infoLogged bool
warnLogged bool
}{
{
"collector_info_level_grpc_log_warn",
telemetry.LogsConfig{
Level: zapcore.InfoLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.InfoLevel),
Encoding: "console",
},
false,
true,
},
{
"collector_debug_level_grpc_log_debug",
telemetry.LogsConfig{
Level: zapcore.DebugLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Encoding: "console",
},
true,
true,
},
{
"collector_warn_level_grpc_log_warn",
telemetry.LogsConfig{
zap.Config{
Development: false, // this must set the grpc loggerV2 to loggerV2
Level: zapcore.WarnLevel,
Level: zap.NewAtomicLevelAt(zapcore.WarnLevel),
Encoding: "console",
},
false,
Expand All @@ -74,11 +72,11 @@ func TestGRPCLogger(t *testing.T) {
})

// create new collector zap logger
logger, err := NewLogger(test.cfg, []zap.Option{hook})
logger, err := test.cfg.Build(hook)
assert.NoError(t, err)

// create colGRPCLogger
glogger := SetColGRPCLogger(logger, test.cfg.Level)
glogger := SetLogger(logger, test.cfg.Level.Level())
assert.NotNil(t, glogger)

glogger.Info(test.name)
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry"
package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"

import (
"os"
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry
package proctelemetry

import (
"testing"
Expand Down
35 changes: 19 additions & 16 deletions service/service.go
Expand Up @@ -20,23 +20,22 @@ import (
"runtime"

"go.opentelemetry.io/otel/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry *telemetry.Telemetry
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
Expand All @@ -46,15 +45,6 @@ func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Telemetry.Metrics.Level,
},
host: &serviceHost{
factories: set.Factories,
buildInfo: set.BuildInfo,
Expand All @@ -64,9 +54,17 @@ func newService(set *settings) (*service, error) {
}

var err error
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{
ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
srv.telemetrySettings = component.TelemetrySettings{
Logger: srv.telemetry.Logger(),
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Service.Telemetry.Metrics.Level,
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
Expand Down Expand Up @@ -128,7 +126,12 @@ func (srv *service) Shutdown(ctx context.Context) error {
}

srv.telemetrySettings.Logger.Info("Shutdown complete.")
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.

if err := srv.telemetry.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
}

// TODO: Shutdown MeterProvider.
return errs
}

Expand Down Expand Up @@ -161,7 +164,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error {

if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/collector/service/internal"
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -28,7 +28,7 @@ func (r recordSampler) Description() string {
return "Always record sampler"
}

func AlwaysRecord() sdktrace.Sampler {
func alwaysRecord() sdktrace.Sampler {
rs := &recordSampler{}
return sdktrace.ParentBased(
rs,
Expand Down
117 changes: 117 additions & 0 deletions service/telemetry/telemetry.go
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Telemetry struct {
logger *zap.Logger
tracerProvider *sdktrace.TracerProvider
}

func (t *Telemetry) TracerProvider() trace.TracerProvider {
return t.tracerProvider
}

func (t *Telemetry) Logger() *zap.Logger {
return t.logger
}

func (t *Telemetry) Shutdown(ctx context.Context) error {
// TODO: Sync logger.
return multierr.Combine(
t.tracerProvider.Shutdown(ctx),
)
}

// Settings holds configuration for building Telemetry.
type Settings struct {
ZapOptions []zap.Option
}

// New creates a new Telemetry from Config.
func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) {
logger, err := newLogger(cfg.Logs, set.ZapOptions)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(alwaysRecord()),
)
// TODO: Remove when https://github.com/open-telemetry/opentelemetry-go/pull/3268 released.
// For the moment, register and unregister so shutdown does not fail.
sp := &nopSpanProcessor{}
tp.RegisterSpanProcessor(sp)
tp.UnregisterSpanProcessor(sp)
return &Telemetry{
logger: logger,
tracerProvider: tp,
}, nil
}

func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
ErrorOutputPaths: cfg.ErrorOutputPaths,
DisableCaller: cfg.DisableCaller,
DisableStacktrace: cfg.DisableStacktrace,
InitialFields: cfg.InitialFields,
}

if zapCfg.Encoding == "console" {
// Human-readable timestamps for console format of logs.
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}

logger, err := zapCfg.Build(options...)
if err != nil {
return nil, err
}

return logger, nil
}

type nopSpanProcessor struct {
}

func (n nopSpanProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) {}

func (n nopSpanProcessor) OnEnd(sdktrace.ReadOnlySpan) {}

func (n nopSpanProcessor) Shutdown(context.Context) error {
return nil
}

func (n nopSpanProcessor) ForceFlush(context.Context) error {
return nil
}

0 comments on commit dfb21cf

Please sign in to comment.