From 36ca2601e033a8b43429560d1bbb7e4460560739 Mon Sep 17 00:00:00 2001 From: Bill Rich Date: Tue, 13 Dec 2022 16:46:09 -0800 Subject: [PATCH] Add s3 object count to trace logs (#975) * Add s3 object count to trace logs * fix debug level --- main.go | 12 ++++++++---- pkg/sources/s3/s3.go | 13 ++++++++++--- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index 40b9fbc3d53d..3835ca5f00e2 100644 --- a/main.go +++ b/main.go @@ -93,7 +93,7 @@ var ( syslogTLSKey = syslogScan.Flag("key", "Path to TLS key.").String() syslogFormat = syslogScan.Flag("format", "Log format. Can be rfc3164 or rfc5424").String() - stderrLevel = zap.NewAtomicLevel() + logLevel = zap.NewAtomicLevel() ) func init() { @@ -113,9 +113,13 @@ func init() { } switch { case *trace: + log.SetLevel(5) + log.SetLevelForControl(logLevel, 5) logrus.SetLevel(logrus.TraceLevel) logrus.Debugf("running version %s", version.BuildVersion) case *debug: + log.SetLevel(2) + log.SetLevelForControl(logLevel, 2) logrus.SetLevel(logrus.DebugLevel) logrus.Debugf("running version %s", version.BuildVersion) default: @@ -172,11 +176,11 @@ func run(state overseer.State) { } }() } - logger, sync := log.New("trufflehog", log.WithConsoleSink(os.Stderr, log.WithLeveler(stderrLevel))) - context.SetDefaultLogger(logger) + logger, sync := log.New("trufflehog", log.WithConsoleSink(os.Stderr, log.WithLeveler(logLevel))) + ctx := context.WithLogger(context.TODO(), logger) + defer func() { _ = sync() }() - ctx := context.TODO() e := engine.Start(ctx, engine.WithConcurrency(*concurrency), engine.WithDecoders(decoders.DefaultDecoders()...), diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index f895d805f299..eb55fca2806c 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -5,6 +5,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "time" "github.com/aws/aws-sdk-go/aws" @@ -68,6 +69,7 @@ func (s *Source) Init(aCtx context.Context, name string, jobId, sourceId int64, s.verify = verify s.concurrency = concurrency s.errorCount = &sync.Map{} + s.log = aCtx.Logger() var conn sourcespb.S3 err := anypb.UnmarshalTo(connection, &conn, proto.UnmarshalOptions{}) @@ -137,6 +139,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err return errors.Errorf("invalid configuration given for %s source", s.name) } + objectCount := uint64(0) for i, bucket := range bucketsToScan { if common.IsDone(ctx) { return nil @@ -165,7 +168,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err err = regionalClient.ListObjectsV2PagesWithContext( ctx, &s3.ListObjectsV2Input{Bucket: &bucket}, func(page *s3.ListObjectsV2Output, last bool) bool { - s.pageChunker(ctx, regionalClient, chunksChan, bucket, page, &errorCount) + s.pageChunker(ctx, regionalClient, chunksChan, bucket, page, &errorCount, i+1, &objectCount) return true }) @@ -173,13 +176,13 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err s.log.Error(err, "could not list objects in s3 bucket", "bucket", bucket) } } - s.SetProgressComplete(len(bucketsToScan), len(bucketsToScan), fmt.Sprintf("Completed scanning source %s", s.name), "") + s.SetProgressComplete(len(bucketsToScan), len(bucketsToScan), fmt.Sprintf("Completed scanning source %s. %d objects scanned.", s.name, objectCount), "") return nil } // pageChunker emits chunks onto the given channel from a page -func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan *sources.Chunk, bucket string, page *s3.ListObjectsV2Output, errorCount *sync.Map) { +func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan *sources.Chunk, bucket string, page *s3.ListObjectsV2Output, errorCount *sync.Map, pageNumber int, objectCount *uint64) { sem := semaphore.NewWeighted(int64(s.concurrency)) var wg sync.WaitGroup for _, obj := range page.Contents { @@ -303,6 +306,8 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan Verify: s.verify, } if handlers.HandleFile(ctx, reader, chunkSkel, chunksChan) { + atomic.AddUint64(objectCount, 1) + s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber) return } @@ -317,6 +322,8 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan s.log.Error(err, "Could not read file data.") return } + atomic.AddUint64(objectCount, 1) + s.log.V(5).Info("S3 object scanned.", "object_count", objectCount, "page_number", pageNumber) chunk.Data = chunkData chunksChan <- &chunk