Skip to content

Commit

Permalink
Start moving telemetry initialization to service/telemetry
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Oct 11, 2022
1 parent 01e4d0c commit ff89980
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 63 deletions.
4 changes: 2 additions & 2 deletions service/collector.go
Expand Up @@ -31,7 +31,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/extension/ballastextension"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/grpclog"
)

// State defines Collector's state.
Expand Down Expand Up @@ -145,7 +145,7 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

if !col.set.SkipSettingGRPCLogger {
telemetrylogs.SetColGRPCLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
grpclog.SetLogger(col.service.telemetrySettings.Logger, cfg.Service.Telemetry.Logs.Level)
}

if err = col.service.Start(ctx); err != nil {
Expand Down
31 changes: 31 additions & 0 deletions service/internal/grpclog/logger.go
@@ -0,0 +1,31 @@
package grpclog

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc/grpclog"
)

// SetLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to
// INFO to avoid copious logging from grpc framework.
func SetLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
var c zapcore.Core
var err error
if loglevel == zapcore.InfoLevel {
loglevel = zapcore.WarnLevel
}
// NewIncreaseLevelCore errors only if the new log level is less than the initial core level.
c, err = zapcore.NewIncreaseLevelCore(core, loglevel)
// In case of an error changing the level, move on, this happens when using the NopCore
if err != nil {
c = core
}
return c.With([]zapcore.Field{zap.Bool("grpc_log", true)})
})))

grpclog.SetLoggerV2(logger)
return logger
}
Expand Up @@ -12,48 +12,46 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs
package grpclog

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/service/telemetry"
)

func TestGRPCLogger(t *testing.T) {
tests := []struct {
name string
cfg telemetry.LogsConfig
cfg zap.Config
infoLogged bool
warnLogged bool
}{
{
"collector_info_level_grpc_log_warn",
telemetry.LogsConfig{
Level: zapcore.InfoLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.InfoLevel),
Encoding: "console",
},
false,
true,
},
{
"collector_debug_level_grpc_log_debug",
telemetry.LogsConfig{
Level: zapcore.DebugLevel,
zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.DebugLevel),
Encoding: "console",
},
true,
true,
},
{
"collector_warn_level_grpc_log_warn",
telemetry.LogsConfig{
zap.Config{
Development: false, // this must set the grpc loggerV2 to loggerV2
Level: zapcore.WarnLevel,
Level: zap.NewAtomicLevelAt(zapcore.WarnLevel),
Encoding: "console",
},
false,
Expand All @@ -74,11 +72,11 @@ func TestGRPCLogger(t *testing.T) {
})

// create new collector zap logger
logger, err := NewLogger(test.cfg, []zap.Option{hook})
logger, err := test.cfg.Build(hook)
assert.NoError(t, err)

// create colGRPCLogger
glogger := SetColGRPCLogger(logger, test.cfg.Level)
glogger := SetLogger(logger, test.cfg.Level.Level())
assert.NotNil(t, glogger)

glogger.Info(test.name)
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry // import "go.opentelemetry.io/collector/service/internal/telemetry"
package proctelemetry // import "go.opentelemetry.io/collector/service/internal/proctelemetry"

import (
"os"
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetry
package proctelemetry

import (
"testing"
Expand Down
35 changes: 19 additions & 16 deletions service/service.go
Expand Up @@ -20,23 +20,22 @@ import (
"runtime"

"go.opentelemetry.io/otel/metric"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/multierr"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/service/extensions"
"go.opentelemetry.io/collector/service/internal"
"go.opentelemetry.io/collector/service/internal/pipelines"
"go.opentelemetry.io/collector/service/internal/telemetry"
"go.opentelemetry.io/collector/service/internal/telemetrylogs"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/telemetry"
)

// service represents the implementation of a component.Host.
type service struct {
buildInfo component.BuildInfo
config *Config
telemetry *telemetry.Telemetry
telemetrySettings component.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
Expand All @@ -46,15 +45,6 @@ func newService(set *settings) (*service, error) {
srv := &service{
buildInfo: set.BuildInfo,
config: set.Config,
telemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
TracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Telemetry.Metrics.Level,
},
host: &serviceHost{
factories: set.Factories,
buildInfo: set.BuildInfo,
Expand All @@ -64,9 +54,17 @@ func newService(set *settings) (*service, error) {
}

var err error
if srv.telemetrySettings.Logger, err = telemetrylogs.NewLogger(set.Config.Service.Telemetry.Logs, set.LoggingOptions); err != nil {
srv.telemetry, err = telemetry.New(context.Background(), telemetry.Settings{
ZapOptions: set.LoggingOptions}, set.Config.Service.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
srv.telemetrySettings = component.TelemetrySettings{
Logger: srv.telemetry.Logger(),
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: metric.NewNoopMeterProvider(),
MetricsLevel: set.Config.Service.Telemetry.Metrics.Level,
}

if err = srv.telemetryInitializer.init(set.BuildInfo, srv.telemetrySettings.Logger, set.Config.Service.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
Expand Down Expand Up @@ -128,7 +126,12 @@ func (srv *service) Shutdown(ctx context.Context) error {
}

srv.telemetrySettings.Logger.Info("Shutdown complete.")
// TODO: Shutdown TracerProvider, MeterProvider, and Sync Logger.

if err := srv.telemetry.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
}

// TODO: Shutdown MeterProvider.
return errs
}

Expand Down Expand Up @@ -161,7 +164,7 @@ func (srv *service) initExtensionsAndPipeline(set *settings) error {

if set.Config.Telemetry.Metrics.Level != configtelemetry.LevelNone && set.Config.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = telemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.ocRegistry, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package internal // import "go.opentelemetry.io/collector/service/internal"
package telemetry // import "go.opentelemetry.io/collector/service/internal"

import (
sdktrace "go.opentelemetry.io/otel/sdk/trace"
Expand All @@ -28,7 +28,7 @@ func (r recordSampler) Description() string {
return "Always record sampler"
}

func AlwaysRecord() sdktrace.Sampler {
func alwaysRecord() sdktrace.Sampler {
rs := &recordSampler{}
return sdktrace.ParentBased(
rs,
Expand Down
Expand Up @@ -12,18 +12,59 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package telemetrylogs // import "go.opentelemetry.io/collector/service/internal/telemetrylogs"
package telemetry

import (
"context"

sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zapgrpc"
"google.golang.org/grpc/grpclog"

"go.opentelemetry.io/collector/service/telemetry"
)

func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, error) {
type Telemetry struct {
logger *zap.Logger
tracerProvider *sdktrace.TracerProvider
}

func (t *Telemetry) TracerProvider() trace.TracerProvider {
return t.tracerProvider
}

func (t *Telemetry) Logger() *zap.Logger {
return t.logger
}

func (t *Telemetry) Shutdown(ctx context.Context) error {
// TODO: Sync logger.
return multierr.Combine(
t.tracerProvider.Shutdown(ctx),
)
}

// Settings holds configuration for building Extensions.
type Settings struct {
ZapOptions []zap.Option
}

// New creates a new Extensions from Config.
func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) {
logger, err := newLogger(cfg.Logs, set.ZapOptions)
if err != nil {
return nil, err
}
return &Telemetry{
logger: logger,
tracerProvider: sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(alwaysRecord()),
),
}, nil
}

func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.Level),
Expand Down Expand Up @@ -53,26 +94,3 @@ func NewLogger(cfg telemetry.LogsConfig, options []zap.Option) (*zap.Logger, err

return logger, nil
}

// SetColGRPCLogger constructs a zapgrpc.Logger instance, and installs it as grpc logger, cloned from baseLogger with
// exact configuration. The minimum level of gRPC logs is set to WARN should the loglevel of the collector is set to
// INFO to avoid copious logging from grpc framework.
func SetColGRPCLogger(baseLogger *zap.Logger, loglevel zapcore.Level) *zapgrpc.Logger {
logger := zapgrpc.NewLogger(baseLogger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
var c zapcore.Core
var err error
if loglevel == zapcore.InfoLevel {
loglevel = zapcore.WarnLevel
}
// NewIncreaseLevelCore errors only if the new log level is less than the initial core level.
c, err = zapcore.NewIncreaseLevelCore(core, loglevel)
// In case of an error changing the level, move on, this happens when using the NopCore
if err != nil {
c = core
}
return c.With([]zapcore.Field{zap.Bool("grpc_log", true)})
})))

grpclog.SetLoggerV2(logger)
return logger
}

0 comments on commit ff89980

Please sign in to comment.