Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start moving telemetry initialization to service/telemetry #6275

Merged
merged 1 commit into from Oct 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions .chloggen/move-tracing-init.yaml
@@ -0,0 +1,11 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: service/telemetry

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Move logging and tracing initialization to service/telemetry"

# One or more tracking issues or pull requests related to the change
issues: [5564]
4 changes: 2 additions & 2 deletions service/collector.go
Expand Up @@ -31,7 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/grpclog"
)

// State defines Collector's state.
Expand Down Expand Up @@ -145,7 +145,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
grpclog.SetLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand Down
Expand Up @@ -12,52 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs // import "go.opentelemetry.io/collector/service/internal/telemetrylogs"
package grpclog // import "go.opentelemetry.io/collector/service/internal/grpclog"

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc/grpclog"

"go.opentelemetry.io/collector/service/telemetry"
)

func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
ErrorOutputPaths: cfg.ErrorOutputPaths,
DisableCaller: cfg.DisableCaller,
DisableStacktrace: cfg.DisableStacktrace,
InitialFields: cfg.InitialFields,
}

if zapCfg.Encoding == "console" {
// Human-readable timestamps for console format of logs.
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}

logger, err := zapCfg.Build(options...)
if err != nil {
return nil, err
}

return logger, nil
}

// SetColGRPCLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// SetLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to
// INFO to avoid copious logging from grpc framework.
func SetColGRPCLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
func SetLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
var c zapcore.Core
var err error
Expand Down
Expand Up @@ -12,48 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs
package grpclog

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/service/telemetry"
)

func TestGRPCLogger(t *testing.T) {
tests := []struct {
name string
cfg telemetry.LogsConfig
cfg zap.Config
infoLogged bool
warnLogged bool
}{
{
"collector_info_level_grpc_log_warn",
telemetry.LogsConfig{
Level: zapcore.InfoLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.InfoLevel),
Encoding: "console",
},
false,
true,
},
{
"collector_debug_level_grpc_log_debug",
telemetry.LogsConfig{
Level: zapcore.DebugLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Encoding: "console",
},
true,
true,
},
{
"collector_warn_level_grpc_log_warn",
telemetry.LogsConfig{
zap.Config{
Development: false, // this must set the grpc loggerV2 to loggerV2
Level: zapcore.WarnLevel,
Level: zap.NewAtomicLevelAt(zapcore.WarnLevel),
Encoding: "console",
},
false,
Expand All @@ -74,11 +72,11 @@ func TestGRPCLogger(t *testing.T) {
})

// create new collector zap logger
logger, err := NewLogger(test.cfg, []zap.Option{hook})
logger, err := test.cfg.Build(hook)
assert.NoError(t, err)

// create colGRPCLogger
glogger := SetColGRPCLogger(logger, test.cfg.Level)
glogger := SetLogger(logger, test.cfg.Level.Level())
assert.NotNil(t, glogger)

glogger.Info(test.name)
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry"
package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"

import (
"os"
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry
package proctelemetry

import (
"testing"
Expand Down
35 changes: 19 additions & 16 deletions service/service.go
Expand Up @@ -20,23 +20,22 @@ import (
"runtime"

"go.opentelemetry.io/otel/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry *telemetry.Telemetry
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
Expand All @@ -46,15 +45,6 @@ func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Telemetry.Metrics.Level,
},
host: &serviceHost{
factories: set.Factories,
buildInfo: set.BuildInfo,
Expand All @@ -64,9 +54,17 @@ func newService(set *settings) (*service, error) {
}

var err error
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{
ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
srv.telemetrySettings = component.TelemetrySettings{
Logger: srv.telemetry.Logger(),
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Service.Telemetry.Metrics.Level,
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
Expand Down Expand Up @@ -128,7 +126,12 @@ func (srv *service) Shutdown(ctx context.Context) error {
}

srv.telemetrySettings.Logger.Info("Shutdown complete.")
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.

if err := srv.telemetry.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
}

// TODO: Shutdown MeterProvider.
return errs
}

Expand Down Expand Up @@ -161,7 +164,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error {

if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/collector/service/internal"
package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -28,7 +28,7 @@ func (r recordSampler) Description() string {
return "Always record sampler"
}

func AlwaysRecord() sdktrace.Sampler {
func alwaysRecord() sdktrace.Sampler {
rs := &recordSampler{}
return sdktrace.ParentBased(
rs,
Expand Down
117 changes: 117 additions & 0 deletions service/telemetry/telemetry.go
@@ -0,0 +1,117 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Telemetry struct {
logger *zap.Logger
tracerProvider *sdktrace.TracerProvider
}

func (t *Telemetry) TracerProvider() trace.TracerProvider {
return t.tracerProvider
}

func (t *Telemetry) Logger() *zap.Logger {
return t.logger
}

func (t *Telemetry) Shutdown(ctx context.Context) error {
// TODO: Sync logger.
return multierr.Combine(
t.tracerProvider.Shutdown(ctx),
)
}

// Settings holds configuration for building Telemetry.
type Settings struct {
ZapOptions []zap.Option
}

// New creates a new Telemetry from Config.
func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) {
logger, err := newLogger(cfg.Logs, set.ZapOptions)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(alwaysRecord()),
)
// TODO: Remove when https://github.com/open-telemetry/opentelemetry-go/pull/3268 released.
// For the moment, register and unregister so shutdown does not fail.
sp := &nopSpanProcessor{}
tp.RegisterSpanProcessor(sp)
tp.UnregisterSpanProcessor(sp)
return &Telemetry{
logger: logger,
tracerProvider: tp,
}, nil
}

func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Development: cfg.Development,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: cfg.Encoding,
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: cfg.OutputPaths,
ErrorOutputPaths: cfg.ErrorOutputPaths,
DisableCaller: cfg.DisableCaller,
DisableStacktrace: cfg.DisableStacktrace,
InitialFields: cfg.InitialFields,
}

if zapCfg.Encoding == "console" {
// Human-readable timestamps for console format of logs.
zapCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
}

logger, err := zapCfg.Build(options...)
if err != nil {
return nil, err
}

return logger, nil
}

type nopSpanProcessor struct {
}

func (n nopSpanProcessor) OnStart(context.Context, sdktrace.ReadWriteSpan) {}

func (n nopSpanProcessor) OnEnd(sdktrace.ReadOnlySpan) {}

func (n nopSpanProcessor) Shutdown(context.Context) error {
return nil
}

func (n nopSpanProcessor) ForceFlush(context.Context) error {
return nil
}