diff --git a/pkg/common/chunker.go b/pkg/common/chunker.go deleted file mode 100644 index 33fc8f0fdd55..000000000000 --- a/pkg/common/chunker.go +++ /dev/null @@ -1,36 +0,0 @@ -package common - -import ( - "bufio" - "errors" - "io" - - log "github.com/sirupsen/logrus" -) - -const ( - ChunkSize = 10 * 1024 - PeekSize = 3 * 1024 -) - -func ChunkReader(r io.Reader) chan []byte { - chunkChan := make(chan []byte) - go func() { - defer close(chunkChan) - reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize) - for { - chunk := make([]byte, ChunkSize) - n, err := reader.Read(chunk) - if err != nil && !errors.Is(err, io.EOF) { - log.WithError(err).Error("Error chunking reader.") - break - } - peekData, _ := reader.Peek(PeekSize) - chunkChan <- append(chunk[:n], peekData...) - if errors.Is(err, io.EOF) { - break - } - } - }() - return chunkChan -} diff --git a/pkg/engine/engine.go b/pkg/engine/engine.go index 196074bff513..75d813ebc697 100644 --- a/pkg/engine/engine.go +++ b/pkg/engine/engine.go @@ -161,74 +161,76 @@ func (e *Engine) DetectorAvgTime() map[string][]time.Duration { } func (e *Engine) detectorWorker(ctx context.Context) { - for chunk := range e.chunks { - fragStart, mdLine := fragmentFirstLine(chunk) - for _, decoder := range e.decoders { - var decoderType detectorspb.DecoderType - switch decoder.(type) { - case *decoders.Plain: - decoderType = detectorspb.DecoderType_PLAIN - case *decoders.Base64: - decoderType = detectorspb.DecoderType_BASE64 - default: - logrus.Warnf("unknown decoder type: %T", decoder) - decoderType = detectorspb.DecoderType_UNKNOWN - } - decoded := decoder.FromChunk(chunk) - if decoded == nil { - continue - } - dataLower := strings.ToLower(string(decoded.Data)) - for verify, detectorsSet := range e.detectors { - for _, detector := range detectorsSet { - start := time.Now() - foundKeyword := false - for _, kw := range detector.Keywords() { - if strings.Contains(dataLower, strings.ToLower(kw)) { - foundKeyword = true - break + for originalChunk := range e.chunks { + for chunk := range sources.Chunker(originalChunk) { + fragStart, mdLine := fragmentFirstLine(chunk) + for _, decoder := range e.decoders { + var decoderType detectorspb.DecoderType + switch decoder.(type) { + case *decoders.Plain: + decoderType = detectorspb.DecoderType_PLAIN + case *decoders.Base64: + decoderType = detectorspb.DecoderType_BASE64 + default: + logrus.Warnf("unknown decoder type: %T", decoder) + decoderType = detectorspb.DecoderType_UNKNOWN + } + decoded := decoder.FromChunk(chunk) + if decoded == nil { + continue + } + dataLower := strings.ToLower(string(decoded.Data)) + for verify, detectorsSet := range e.detectors { + for _, detector := range detectorsSet { + start := time.Now() + foundKeyword := false + for _, kw := range detector.Keywords() { + if strings.Contains(dataLower, strings.ToLower(kw)) { + foundKeyword = true + break + } + } + if !foundKeyword { + continue } - } - if !foundKeyword { - continue - } - - results, err := func() ([]detectors.Result, error) { - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - defer common.Recover(ctx) - return detector.FromData(ctx, verify, decoded.Data) - }() - if err != nil { - logrus.WithFields(logrus.Fields{ - "source_type": decoded.SourceType.String(), - "metadata": decoded.SourceMetadata, - }).WithError(err).Error("could not scan chunk") - continue - } - for _, result := range results { - if isGitSource(chunk.SourceType) { - offset := FragmentLineOffset(chunk, &result) - *mdLine = fragStart + offset + results, err := func() ([]detectors.Result, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + defer common.Recover(ctx) + return detector.FromData(ctx, verify, decoded.Data) + }() + if err != nil { + logrus.WithFields(logrus.Fields{ + "source_type": decoded.SourceType.String(), + "metadata": decoded.SourceMetadata, + }).WithError(err).Error("could not scan chunk") + continue } - result.DecoderType = decoderType - e.results <- detectors.CopyMetadata(chunk, result) - } - if len(results) > 0 { - elapsed := time.Since(start) - detectorName := results[0].DetectorType.String() - avgTimeI, ok := e.detectorAvgTime.Load(detectorName) - var avgTime []time.Duration - if ok { - avgTime, ok = avgTimeI.([]time.Duration) - if !ok { - continue + for _, result := range results { + if isGitSource(chunk.SourceType) { + offset := FragmentLineOffset(chunk, &result) + *mdLine = fragStart + offset + } + result.DecoderType = decoderType + e.results <- detectors.CopyMetadata(chunk, result) + + } + if len(results) > 0 { + elapsed := time.Since(start) + detectorName := results[0].DetectorType.String() + avgTimeI, ok := e.detectorAvgTime.Load(detectorName) + var avgTime []time.Duration + if ok { + avgTime, ok = avgTimeI.([]time.Duration) + if !ok { + continue + } } + avgTime = append(avgTime, elapsed) + e.detectorAvgTime.Store(detectorName, avgTime) } - avgTime = append(avgTime, elapsed) - e.detectorAvgTime.Store(detectorName, avgTime) } } } diff --git a/pkg/sources/chunker.go b/pkg/sources/chunker.go new file mode 100644 index 000000000000..ad8d88acaf08 --- /dev/null +++ b/pkg/sources/chunker.go @@ -0,0 +1,47 @@ +package sources + +import ( + "bufio" + "bytes" + "errors" + "io" + + "github.com/sirupsen/logrus" +) + +const ( + // ChunkSize is the maximum size of a chunk. + ChunkSize = 10 * 1024 + // PeekSize is the size of the peek into the previous chunk. + PeekSize = 3 * 1024 +) + +// Chunker takes a chunk and splits it into chunks of ChunkSize. +func Chunker(originalChunk *Chunk) chan *Chunk { + chunkChan := make(chan *Chunk) + go func() { + defer close(chunkChan) + if len(originalChunk.Data) <= ChunkSize+PeekSize { + chunkChan <- originalChunk + return + } + r := bytes.NewReader(originalChunk.Data) + reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize) + for { + chunkBytes := make([]byte, ChunkSize) + chunk := *originalChunk + n, err := reader.Read(chunkBytes) + if err != nil && !errors.Is(err, io.EOF) { + logrus.WithError(err).Error("Error chunking reader.") + break + } + peekData, _ := reader.Peek(PeekSize) + chunk.Data = append(chunkBytes[:n], peekData...) + chunkChan <- &chunk + if errors.Is(err, io.EOF) { + break + } + } + }() + return chunkChan +} diff --git a/pkg/common/chunker_test.go b/pkg/sources/chunker_test.go similarity index 81% rename from pkg/common/chunker_test.go rename to pkg/sources/chunker_test.go index 01949f325897..b17ae43e9a99 100644 --- a/pkg/common/chunker_test.go +++ b/pkg/sources/chunker_test.go @@ -1,4 +1,4 @@ -package common +package sources import ( "bufio" @@ -55,16 +55,20 @@ func TestChunker(t *testing.T) { _ = reReader.Reset() testChunkCount := 0 - for chunk := range ChunkReader(reReader) { + chunkData, _ := io.ReadAll(reReader) + originalChunk := &Chunk{ + Data: chunkData, + } + for chunk := range Chunker(originalChunk) { testChunkCount++ switch testChunkCount { case 1: - if !bytes.Equal(baseChunkOne, chunk) { - t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkOne)) + if !bytes.Equal(baseChunkOne, chunk.Data) { + t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk.Data), len(baseChunkOne)) } case 2: - if !bytes.Equal(baseChunkTwo, chunk) { - t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkTwo)) + if !bytes.Equal(baseChunkTwo, chunk.Data) { + t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk.Data), len(baseChunkTwo)) } } } diff --git a/pkg/sources/filesystem/filesystem.go b/pkg/sources/filesystem/filesystem.go index c92ece40e273..2a9642fc9bf5 100644 --- a/pkg/sources/filesystem/filesystem.go +++ b/pkg/sources/filesystem/filesystem.go @@ -13,7 +13,6 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/handlers" "github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb" @@ -140,22 +139,23 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err return err } reReader.Stop() - - for chunkData := range common.ChunkReader(reReader) { - chunksChan <- &sources.Chunk{ - SourceType: s.Type(), - SourceName: s.name, - SourceID: s.SourceID(), - Data: chunkData, - SourceMetadata: &source_metadatapb.MetaData{ - Data: &source_metadatapb.MetaData_Filesystem{ - Filesystem: &source_metadatapb.Filesystem{ - File: sanitizer.UTF8(path), - }, + data, err := io.ReadAll(reReader) + if err != nil { + return err + } + chunksChan <- &sources.Chunk{ + SourceType: s.Type(), + SourceName: s.name, + SourceID: s.SourceID(), + Data: data, + SourceMetadata: &source_metadatapb.MetaData{ + Data: &source_metadatapb.MetaData_Filesystem{ + Filesystem: &source_metadatapb.Filesystem{ + File: sanitizer.UTF8(path), }, }, - Verify: s.verify, - } + }, + Verify: s.verify, } return nil }) diff --git a/pkg/sources/git/git.go b/pkg/sources/git/git.go index 83903eae06b6..e41db64af163 100644 --- a/pkg/sources/git/git.go +++ b/pkg/sources/git/git.go @@ -27,7 +27,6 @@ import ( "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/anypb" - "github.com/trufflesecurity/trufflehog/v3/pkg/common" "github.com/trufflesecurity/trufflehog/v3/pkg/context" "github.com/trufflesecurity/trufflehog/v3/pkg/gitparse" "github.com/trufflesecurity/trufflehog/v3/pkg/handlers" @@ -373,7 +372,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string continue } - if diff.Content.Len() > common.ChunkSize+common.PeekSize { + if diff.Content.Len() > sources.ChunkSize+sources.PeekSize { s.gitChunk(diff, fileName, email, hash, when, urlMetadata, chunksChan) continue } @@ -397,7 +396,7 @@ func (s *Git) gitChunk(diff gitparse.Diff, fileName, email, hash, when, urlMetad lastOffset := 0 for offset := 0; originalChunk.Scan(); offset++ { line := originalChunk.Bytes() - if len(line) > common.ChunkSize || len(line)+newChunkBuffer.Len() > common.ChunkSize { + if len(line) > sources.ChunkSize || len(line)+newChunkBuffer.Len() > sources.ChunkSize { // Add oversize chunk info if newChunkBuffer.Len() > 0 { // Send the existing fragment. @@ -413,7 +412,7 @@ func (s *Git) gitChunk(diff gitparse.Diff, fileName, email, hash, when, urlMetad newChunkBuffer.Reset() lastOffset = offset } - if len(line) > common.ChunkSize { + if len(line) > sources.ChunkSize { // Send the oversize line. metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart+offset)) chunksChan <- &sources.Chunk{ @@ -816,11 +815,12 @@ func handleBinary(repo *git.Repository, chunksChan chan *sources.Chunk, chunkSke } reader.Stop() - chunk := *chunkSkel chunkData, err := io.ReadAll(reader) if err != nil { return err } + + chunk := *chunkSkel chunk.Data = chunkData chunksChan <- &chunk diff --git a/pkg/sources/s3/s3.go b/pkg/sources/s3/s3.go index 582740359e00..6c5dbb59c8c4 100644 --- a/pkg/sources/s3/s3.go +++ b/pkg/sources/s3/s3.go @@ -2,6 +2,7 @@ package s3 import ( "fmt" + "io" "strings" "sync" "time" @@ -295,11 +296,15 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan } reader.Stop() - for chunkData := range common.ChunkReader(reader) { - chunk := *chunkSkel - chunk.Data = chunkData - chunksChan <- &chunk + chunk := *chunkSkel + chunkData, err := io.ReadAll(reader) + if err != nil { + log.WithError(err).Error("Could not read file data.") + return } + chunk.Data = chunkData + chunksChan <- &chunk + nErr, ok = errorCount.Load(prefix) if !ok { nErr = 0