diff --git a/pkg/common/chunker.go b/pkg/common/chunker.go index 33fc8f0fdd55..9aca5053dad1 100644 --- a/pkg/common/chunker.go +++ b/pkg/common/chunker.go @@ -1,36 +1,6 @@ package common -import ( - "bufio" - "errors" - "io" - - log "github.com/sirupsen/logrus" -) - const ( ChunkSize = 10 * 1024 PeekSize = 3 * 1024 ) - -func ChunkReader(r io.Reader) chan []byte { - chunkChan := make(chan []byte) - go func() { - defer close(chunkChan) - reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize) - for { - chunk := make([]byte, ChunkSize) - n, err := reader.Read(chunk) - if err != nil && !errors.Is(err, io.EOF) { - log.WithError(err).Error("Error chunking reader.") - break - } - peekData, _ := reader.Peek(PeekSize) - chunkChan <- append(chunk[:n], peekData...) - if errors.Is(err, io.EOF) { - break - } - } - }() - return chunkChan -} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 196074bff513..75d813ebc697 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -161,74 +161,76 @@ func (e *Engine) DetectorAvgTime() map[string][]time.Duration { } func (e *Engine) detectorWorker(ctx context.Context) { - for chunk := range e.chunks { - fragStart, mdLine := fragmentFirstLine(chunk) - for _, decoder := range e.decoders { - var decoderType detectorspb.DecoderType - switch decoder.(type) { - case *decoders.Plain: - decoderType = detectorspb.DecoderType_PLAIN - case *decoders.Base64: - decoderType = detectorspb.DecoderType_BASE64 - default: - logrus.Warnf("unknown decoder type: %T", decoder) - decoderType = detectorspb.DecoderType_UNKNOWN - } - decoded := decoder.FromChunk(chunk) - if decoded == nil { - continue - } - dataLower := strings.ToLower(string(decoded.Data)) - for verify, detectorsSet := range e.detectors { - for _, detector := range detectorsSet { - start := time.Now() - foundKeyword := false - for _, kw := range detector.Keywords() { - if strings.Contains(dataLower, strings.ToLower(kw)) { - foundKeyword = true - break + for originalChunk := range e.chunks { + for chunk := range sources.Chunker(originalChunk) { + fragStart, mdLine := fragmentFirstLine(chunk) + for _, decoder := range e.decoders { + var decoderType detectorspb.DecoderType + switch decoder.(type) { + case *decoders.Plain: + decoderType = detectorspb.DecoderType_PLAIN + case *decoders.Base64: + decoderType = detectorspb.DecoderType_BASE64 + default: + logrus.Warnf("unknown decoder type: %T", decoder) + decoderType = detectorspb.DecoderType_UNKNOWN + } + decoded := decoder.FromChunk(chunk) + if decoded == nil { + continue + } + dataLower := strings.ToLower(string(decoded.Data)) + for verify, detectorsSet := range e.detectors { + for _, detector := range detectorsSet { + start := time.Now() + foundKeyword := false + for _, kw := range detector.Keywords() { + if strings.Contains(dataLower, strings.ToLower(kw)) { + foundKeyword = true + break + } + } + if !foundKeyword { + continue } - } - if !foundKeyword { - continue - } - - results, err := func() ([]detectors.Result, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - defer common.Recover(ctx) - return detector.FromData(ctx, verify, decoded.Data) - }() - if err != nil { - logrus.WithFields(logrus.Fields{ - "source_type": decoded.SourceType.String(), - "metadata": decoded.SourceMetadata, - }).WithError(err).Error("could not scan chunk") - continue - } - for _, result := range results { - if isGitSource(chunk.SourceType) { - offset := FragmentLineOffset(chunk, &result) - *mdLine = fragStart + offset + results, err := func() ([]detectors.Result, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + defer common.Recover(ctx) + return detector.FromData(ctx, verify, decoded.Data) + }() + if err != nil { + logrus.WithFields(logrus.Fields{ + "source_type": decoded.SourceType.String(), + "metadata": decoded.SourceMetadata, + }).WithError(err).Error("could not scan chunk") + continue } - result.DecoderType = decoderType - e.results <- detectors.CopyMetadata(chunk, result) - } - if len(results) > 0 { - elapsed := time.Since(start) - detectorName := results[0].DetectorType.String() - avgTimeI, ok := e.detectorAvgTime.Load(detectorName) - var avgTime []time.Duration - if ok { - avgTime, ok = avgTimeI.([]time.Duration) - if !ok { - continue + for _, result := range results { + if isGitSource(chunk.SourceType) { + offset := FragmentLineOffset(chunk, &result) + *mdLine = fragStart + offset + } + result.DecoderType = decoderType + e.results <- detectors.CopyMetadata(chunk, result) + + } + if len(results) > 0 { + elapsed := time.Since(start) + detectorName := results[0].DetectorType.String() + avgTimeI, ok := e.detectorAvgTime.Load(detectorName) + var avgTime []time.Duration + if ok { + avgTime, ok = avgTimeI.([]time.Duration) + if !ok { + continue + } } + avgTime = append(avgTime, elapsed) + e.detectorAvgTime.Store(detectorName, avgTime) } - avgTime = append(avgTime, elapsed) - e.detectorAvgTime.Store(detectorName, avgTime) } } } diff --git a/pkg/sources/filesystem/filesystem.go b/pkg/sources/filesystem/filesystem.go index c92ece40e273..2a9642fc9bf5 100644 --- a/pkg/sources/filesystem/filesystem.go +++ b/pkg/sources/filesystem/filesystem.go @@ -13,7 +13,6 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/handlers" "github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb" @@ -140,22 +139,23 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err return err } reReader.Stop() - - for chunkData := range common.ChunkReader(reReader) { - chunksChan <- &sources.Chunk{ - SourceType: s.Type(), - SourceName: s.name, - SourceID: s.SourceID(), - Data: chunkData, - SourceMetadata: &source_metadatapb.MetaData{ - Data: &source_metadatapb.MetaData_Filesystem{ - Filesystem: &source_metadatapb.Filesystem{ - File: sanitizer.UTF8(path), - }, + data, err := io.ReadAll(reReader) + if err != nil { + return err + } + chunksChan <- &sources.Chunk{ + SourceType: s.Type(), + SourceName: s.name, + SourceID: s.SourceID(), + Data: data, + SourceMetadata: &source_metadatapb.MetaData{ + Data: &source_metadatapb.MetaData_Filesystem{ + Filesystem: &source_metadatapb.Filesystem{ + File: sanitizer.UTF8(path), }, }, - Verify: s.verify, - } + }, + Verify: s.verify, } return nil }) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index b74f501be330..286852ab0e92 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -2,6 +2,7 @@ package git import ( "fmt" + "io" "io/ioutil" "net/url" "os" @@ -24,7 +25,6 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/gitparse" "github.com/trufflesecurity/trufflehog/v3/pkg/handlers" @@ -753,11 +753,13 @@ func handleBinary(repo *git.Repository, chunksChan chan *sources.Chunk, chunkSke } reader.Stop() - for chunkData := range common.ChunkReader(reader) { - chunk := *chunkSkel - chunk.Data = chunkData - chunksChan <- &chunk + chunkData, err := io.ReadAll(reader) + if err != nil { + return err } + chunk := *chunkSkel + chunk.Data = chunkData + chunksChan <- &chunk return nil } diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index 582740359e00..6c5dbb59c8c4 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -2,6 +2,7 @@ package s3 import ( "fmt" + "io" "strings" "sync" "time" @@ -295,11 +296,15 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan } reader.Stop() - for chunkData := range common.ChunkReader(reader) { - chunk := *chunkSkel - chunk.Data = chunkData - chunksChan <- &chunk + chunk := *chunkSkel + chunkData, err := io.ReadAll(reader) + if err != nil { + log.WithError(err).Error("Could not read file data.") + return } + chunk.Data = chunkData + chunksChan <- &chunk + nErr, ok = errorCount.Load(prefix) if !ok { nErr = 0 diff --git a/pkg/sources/sources.go b/pkg/sources/sources.go index 310492e1fcfc..c0ee97f06504 100644 --- a/pkg/sources/sources.go +++ b/pkg/sources/sources.go @@ -1,10 +1,15 @@ package sources import ( + "bufio" + "bytes" + "errors" + "io" "sync" "google.golang.org/protobuf/types/known/anypb" + "github.com/sirupsen/logrus" "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb" @@ -143,3 +148,33 @@ func (p *Progress) GetProgress() *Progress { defer p.mut.Unlock() return p } + +// Chunker takes a chunk and splits it into chunks of common.ChunkSize. +func Chunker(originalChunk *Chunk) chan *Chunk { + chunkChan := make(chan *Chunk) + go func() { + defer close(chunkChan) + if len(originalChunk.Data) <= common.ChunkSize+common.PeekSize { + chunkChan <- originalChunk + return + } + r := bytes.NewReader(originalChunk.Data) + reader := bufio.NewReaderSize(bufio.NewReader(r), common.ChunkSize) + for { + chunkBytes := make([]byte, common.ChunkSize) + chunk := *originalChunk + n, err := reader.Read(chunkBytes) + if err != nil && !errors.Is(err, io.EOF) { + logrus.WithError(err).Error("Error chunking reader.") + break + } + peekData, _ := reader.Peek(common.PeekSize) + chunk.Data = append(chunkBytes[:n], peekData...) + chunkChan <- &chunk + if errors.Is(err, io.EOF) { + break + } + } + }() + return chunkChan +}