Skip to content

Commit

Permalink
Run chunker in pipeline (#859)
Browse files Browse the repository at this point in the history
* Run chunker in pipeline

* Move ChunkSize and PeekSize to source package.

* Use new Chunk and Peek size location
  • Loading branch information
bill-rich committed Oct 24, 2022
1 parent 3d5f697 commit 958266e
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 128 deletions.
36 changes: 0 additions & 36 deletions pkg/common/chunker.go

This file was deleted.

126 changes: 64 additions & 62 deletions pkg/engine/engine.go
Expand Up @@ -161,74 +161,76 @@ func (e *Engine) DetectorAvgTime() map[string][]time.Duration {
}

func (e *Engine) detectorWorker(ctx context.Context) {
for chunk := range e.chunks {
fragStart, mdLine := fragmentFirstLine(chunk)
for _, decoder := range e.decoders {
var decoderType detectorspb.DecoderType
switch decoder.(type) {
case *decoders.Plain:
decoderType = detectorspb.DecoderType_PLAIN
case *decoders.Base64:
decoderType = detectorspb.DecoderType_BASE64
default:
logrus.Warnf("unknown decoder type: %T", decoder)
decoderType = detectorspb.DecoderType_UNKNOWN
}
decoded := decoder.FromChunk(chunk)
if decoded == nil {
continue
}
dataLower := strings.ToLower(string(decoded.Data))
for verify, detectorsSet := range e.detectors {
for _, detector := range detectorsSet {
start := time.Now()
foundKeyword := false
for _, kw := range detector.Keywords() {
if strings.Contains(dataLower, strings.ToLower(kw)) {
foundKeyword = true
break
for originalChunk := range e.chunks {
for chunk := range sources.Chunker(originalChunk) {
fragStart, mdLine := fragmentFirstLine(chunk)
for _, decoder := range e.decoders {
var decoderType detectorspb.DecoderType
switch decoder.(type) {
case *decoders.Plain:
decoderType = detectorspb.DecoderType_PLAIN
case *decoders.Base64:
decoderType = detectorspb.DecoderType_BASE64
default:
logrus.Warnf("unknown decoder type: %T", decoder)
decoderType = detectorspb.DecoderType_UNKNOWN
}
decoded := decoder.FromChunk(chunk)
if decoded == nil {
continue
}
dataLower := strings.ToLower(string(decoded.Data))
for verify, detectorsSet := range e.detectors {
for _, detector := range detectorsSet {
start := time.Now()
foundKeyword := false
for _, kw := range detector.Keywords() {
if strings.Contains(dataLower, strings.ToLower(kw)) {
foundKeyword = true
break
}
}
if !foundKeyword {
continue
}
}
if !foundKeyword {
continue
}

results, err := func() ([]detectors.Result, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
defer common.Recover(ctx)
return detector.FromData(ctx, verify, decoded.Data)
}()
if err != nil {
logrus.WithFields(logrus.Fields{
"source_type": decoded.SourceType.String(),
"metadata": decoded.SourceMetadata,
}).WithError(err).Error("could not scan chunk")
continue
}

for _, result := range results {
if isGitSource(chunk.SourceType) {
offset := FragmentLineOffset(chunk, &result)
*mdLine = fragStart + offset
results, err := func() ([]detectors.Result, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
defer common.Recover(ctx)
return detector.FromData(ctx, verify, decoded.Data)
}()
if err != nil {
logrus.WithFields(logrus.Fields{
"source_type": decoded.SourceType.String(),
"metadata": decoded.SourceMetadata,
}).WithError(err).Error("could not scan chunk")
continue
}
result.DecoderType = decoderType
e.results <- detectors.CopyMetadata(chunk, result)

}
if len(results) > 0 {
elapsed := time.Since(start)
detectorName := results[0].DetectorType.String()
avgTimeI, ok := e.detectorAvgTime.Load(detectorName)
var avgTime []time.Duration
if ok {
avgTime, ok = avgTimeI.([]time.Duration)
if !ok {
continue
for _, result := range results {
if isGitSource(chunk.SourceType) {
offset := FragmentLineOffset(chunk, &result)
*mdLine = fragStart + offset
}
result.DecoderType = decoderType
e.results <- detectors.CopyMetadata(chunk, result)

}
if len(results) > 0 {
elapsed := time.Since(start)
detectorName := results[0].DetectorType.String()
avgTimeI, ok := e.detectorAvgTime.Load(detectorName)
var avgTime []time.Duration
if ok {
avgTime, ok = avgTimeI.([]time.Duration)
if !ok {
continue
}
}
avgTime = append(avgTime, elapsed)
e.detectorAvgTime.Store(detectorName, avgTime)
}
avgTime = append(avgTime, elapsed)
e.detectorAvgTime.Store(detectorName, avgTime)
}
}
}
Expand Down
47 changes: 47 additions & 0 deletions pkg/sources/chunker.go
@@ -0,0 +1,47 @@
package sources

import (
"bufio"
"bytes"
"errors"
"io"

"github.com/sirupsen/logrus"
)

const (
// ChunkSize is the maximum size of a chunk.
ChunkSize = 10 * 1024
// PeekSize is the size of the peek into the previous chunk.
PeekSize = 3 * 1024
)

// Chunker takes a chunk and splits it into chunks of ChunkSize.
func Chunker(originalChunk *Chunk) chan *Chunk {
chunkChan := make(chan *Chunk)
go func() {
defer close(chunkChan)
if len(originalChunk.Data) <= ChunkSize+PeekSize {
chunkChan <- originalChunk
return
}
r := bytes.NewReader(originalChunk.Data)
reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize)
for {
chunkBytes := make([]byte, ChunkSize)
chunk := *originalChunk
n, err := reader.Read(chunkBytes)
if err != nil && !errors.Is(err, io.EOF) {
logrus.WithError(err).Error("Error chunking reader.")
break
}
peekData, _ := reader.Peek(PeekSize)
chunk.Data = append(chunkBytes[:n], peekData...)
chunkChan <- &chunk
if errors.Is(err, io.EOF) {
break
}
}
}()
return chunkChan
}
16 changes: 10 additions & 6 deletions pkg/common/chunker_test.go → pkg/sources/chunker_test.go
@@ -1,4 +1,4 @@
package common
package sources

import (
"bufio"
Expand Down Expand Up @@ -55,16 +55,20 @@ func TestChunker(t *testing.T) {
_ = reReader.Reset()

testChunkCount := 0
for chunk := range ChunkReader(reReader) {
chunkData, _ := io.ReadAll(reReader)
originalChunk := &Chunk{
Data: chunkData,
}
for chunk := range Chunker(originalChunk) {
testChunkCount++
switch testChunkCount {
case 1:
if !bytes.Equal(baseChunkOne, chunk) {
t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkOne))
if !bytes.Equal(baseChunkOne, chunk.Data) {
t.Errorf("First chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk.Data), len(baseChunkOne))
}
case 2:
if !bytes.Equal(baseChunkTwo, chunk) {
t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk), len(baseChunkTwo))
if !bytes.Equal(baseChunkTwo, chunk.Data) {
t.Errorf("Second chunk did not match expected. Got: %d bytes, expected: %d bytes", len(chunk.Data), len(baseChunkTwo))
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/sources/filesystem/filesystem.go
Expand Up @@ -13,7 +13,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/handlers"
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb"
Expand Down Expand Up @@ -140,22 +139,23 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
return err
}
reReader.Stop()

for chunkData := range common.ChunkReader(reReader) {
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: chunkData,
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
data, err := io.ReadAll(reReader)
if err != nil {
return err
}
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: data,
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
},
Verify: s.verify,
}
},
Verify: s.verify,
}
return nil
})
Expand Down
10 changes: 5 additions & 5 deletions pkg/sources/git/git.go
Expand Up @@ -27,7 +27,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/gitparse"
"github.com/trufflesecurity/trufflehog/v3/pkg/handlers"
Expand Down Expand Up @@ -373,7 +372,7 @@ func (s *Git) ScanCommits(ctx context.Context, repo *git.Repository, path string
continue
}

if diff.Content.Len() > common.ChunkSize+common.PeekSize {
if diff.Content.Len() > sources.ChunkSize+sources.PeekSize {
s.gitChunk(diff, fileName, email, hash, when, urlMetadata, chunksChan)
continue
}
Expand All @@ -397,7 +396,7 @@ func (s *Git) gitChunk(diff gitparse.Diff, fileName, email, hash, when, urlMetad
lastOffset := 0
for offset := 0; originalChunk.Scan(); offset++ {
line := originalChunk.Bytes()
if len(line) > common.ChunkSize || len(line)+newChunkBuffer.Len() > common.ChunkSize {
if len(line) > sources.ChunkSize || len(line)+newChunkBuffer.Len() > sources.ChunkSize {
// Add oversize chunk info
if newChunkBuffer.Len() > 0 {
// Send the existing fragment.
Expand All @@ -413,7 +412,7 @@ func (s *Git) gitChunk(diff gitparse.Diff, fileName, email, hash, when, urlMetad
newChunkBuffer.Reset()
lastOffset = offset
}
if len(line) > common.ChunkSize {
if len(line) > sources.ChunkSize {
// Send the oversize line.
metadata := s.sourceMetadataFunc(fileName, email, hash, when, urlMetadata, int64(diff.LineStart+offset))
chunksChan <- &sources.Chunk{
Expand Down Expand Up @@ -816,11 +815,12 @@ func handleBinary(repo *git.Repository, chunksChan chan *sources.Chunk, chunkSke
}
reader.Stop()

chunk := *chunkSkel
chunkData, err := io.ReadAll(reader)
if err != nil {
return err
}

chunk := *chunkSkel
chunk.Data = chunkData
chunksChan <- &chunk

Expand Down
13 changes: 9 additions & 4 deletions pkg/sources/s3/s3.go
Expand Up @@ -2,6 +2,7 @@ package s3

import (
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -295,11 +296,15 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
}
reader.Stop()

for chunkData := range common.ChunkReader(reader) {
chunk := *chunkSkel
chunk.Data = chunkData
chunksChan <- &chunk
chunk := *chunkSkel
chunkData, err := io.ReadAll(reader)
if err != nil {
log.WithError(err).Error("Could not read file data.")
return
}
chunk.Data = chunkData
chunksChan <- &chunk

nErr, ok = errorCount.Load(prefix)
if !ok {
nErr = 0
Expand Down

0 comments on commit 958266e

Please sign in to comment.