Skip to content

Commit

Permalink
Run chunker in pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
bill-rich committed Oct 24, 2022
1 parent fb56b9f commit f5b8a07
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 116 deletions.
30 changes: 0 additions & 30 deletions pkg/common/chunker.go
@@ -1,36 +1,6 @@
package common

import (
"bufio"
"errors"
"io"

log "github.com/sirupsen/logrus"
)

const (
ChunkSize = 10 * 1024
PeekSize = 3 * 1024
)

func ChunkReader(r io.Reader) chan []byte {
chunkChan := make(chan []byte)
go func() {
defer close(chunkChan)
reader := bufio.NewReaderSize(bufio.NewReader(r), ChunkSize)
for {
chunk := make([]byte, ChunkSize)
n, err := reader.Read(chunk)
if err != nil && !errors.Is(err, io.EOF) {
log.WithError(err).Error("Error chunking reader.")
break
}
peekData, _ := reader.Peek(PeekSize)
chunkChan <- append(chunk[:n], peekData...)
if errors.Is(err, io.EOF) {
break
}
}
}()
return chunkChan
}
126 changes: 64 additions & 62 deletions pkg/engine/engine.go
Expand Up @@ -161,74 +161,76 @@ func (e *Engine) DetectorAvgTime() map[string][]time.Duration {
}

func (e *Engine) detectorWorker(ctx context.Context) {
for chunk := range e.chunks {
fragStart, mdLine := fragmentFirstLine(chunk)
for _, decoder := range e.decoders {
var decoderType detectorspb.DecoderType
switch decoder.(type) {
case *decoders.Plain:
decoderType = detectorspb.DecoderType_PLAIN
case *decoders.Base64:
decoderType = detectorspb.DecoderType_BASE64
default:
logrus.Warnf("unknown decoder type: %T", decoder)
decoderType = detectorspb.DecoderType_UNKNOWN
}
decoded := decoder.FromChunk(chunk)
if decoded == nil {
continue
}
dataLower := strings.ToLower(string(decoded.Data))
for verify, detectorsSet := range e.detectors {
for _, detector := range detectorsSet {
start := time.Now()
foundKeyword := false
for _, kw := range detector.Keywords() {
if strings.Contains(dataLower, strings.ToLower(kw)) {
foundKeyword = true
break
for originalChunk := range e.chunks {
for chunk := range sources.Chunker(originalChunk) {
fragStart, mdLine := fragmentFirstLine(chunk)
for _, decoder := range e.decoders {
var decoderType detectorspb.DecoderType
switch decoder.(type) {
case *decoders.Plain:
decoderType = detectorspb.DecoderType_PLAIN
case *decoders.Base64:
decoderType = detectorspb.DecoderType_BASE64
default:
logrus.Warnf("unknown decoder type: %T", decoder)
decoderType = detectorspb.DecoderType_UNKNOWN
}
decoded := decoder.FromChunk(chunk)
if decoded == nil {
continue
}
dataLower := strings.ToLower(string(decoded.Data))
for verify, detectorsSet := range e.detectors {
for _, detector := range detectorsSet {
start := time.Now()
foundKeyword := false
for _, kw := range detector.Keywords() {
if strings.Contains(dataLower, strings.ToLower(kw)) {
foundKeyword = true
break
}
}
if !foundKeyword {
continue
}
}
if !foundKeyword {
continue
}

results, err := func() ([]detectors.Result, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
defer common.Recover(ctx)
return detector.FromData(ctx, verify, decoded.Data)
}()
if err != nil {
logrus.WithFields(logrus.Fields{
"source_type": decoded.SourceType.String(),
"metadata": decoded.SourceMetadata,
}).WithError(err).Error("could not scan chunk")
continue
}

for _, result := range results {
if isGitSource(chunk.SourceType) {
offset := FragmentLineOffset(chunk, &result)
*mdLine = fragStart + offset
results, err := func() ([]detectors.Result, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
defer common.Recover(ctx)
return detector.FromData(ctx, verify, decoded.Data)
}()
if err != nil {
logrus.WithFields(logrus.Fields{
"source_type": decoded.SourceType.String(),
"metadata": decoded.SourceMetadata,
}).WithError(err).Error("could not scan chunk")
continue
}
result.DecoderType = decoderType
e.results <- detectors.CopyMetadata(chunk, result)

}
if len(results) > 0 {
elapsed := time.Since(start)
detectorName := results[0].DetectorType.String()
avgTimeI, ok := e.detectorAvgTime.Load(detectorName)
var avgTime []time.Duration
if ok {
avgTime, ok = avgTimeI.([]time.Duration)
if !ok {
continue
for _, result := range results {
if isGitSource(chunk.SourceType) {
offset := FragmentLineOffset(chunk, &result)
*mdLine = fragStart + offset
}
result.DecoderType = decoderType
e.results <- detectors.CopyMetadata(chunk, result)

}
if len(results) > 0 {
elapsed := time.Since(start)
detectorName := results[0].DetectorType.String()
avgTimeI, ok := e.detectorAvgTime.Load(detectorName)
var avgTime []time.Duration
if ok {
avgTime, ok = avgTimeI.([]time.Duration)
if !ok {
continue
}
}
avgTime = append(avgTime, elapsed)
e.detectorAvgTime.Store(detectorName, avgTime)
}
avgTime = append(avgTime, elapsed)
e.detectorAvgTime.Store(detectorName, avgTime)
}
}
}
Expand Down
30 changes: 15 additions & 15 deletions pkg/sources/filesystem/filesystem.go
Expand Up @@ -13,7 +13,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/handlers"
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb"
Expand Down Expand Up @@ -140,22 +139,23 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
return err
}
reReader.Stop()

for chunkData := range common.ChunkReader(reReader) {
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: chunkData,
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
data, err := io.ReadAll(reReader)
if err != nil {
return err
}
chunksChan <- &sources.Chunk{
SourceType: s.Type(),
SourceName: s.name,
SourceID: s.SourceID(),
Data: data,
SourceMetadata: &source_metadatapb.MetaData{
Data: &source_metadatapb.MetaData_Filesystem{
Filesystem: &source_metadatapb.Filesystem{
File: sanitizer.UTF8(path),
},
},
Verify: s.verify,
}
},
Verify: s.verify,
}
return nil
})
Expand Down
12 changes: 7 additions & 5 deletions pkg/sources/git/git.go
Expand Up @@ -2,6 +2,7 @@ package git

import (
"fmt"
"io"
"io/ioutil"
"net/url"
"os"
Expand All @@ -24,7 +25,6 @@ import (
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/gitparse"
"github.com/trufflesecurity/trufflehog/v3/pkg/handlers"
Expand Down Expand Up @@ -753,11 +753,13 @@ func handleBinary(repo *git.Repository, chunksChan chan *sources.Chunk, chunkSke
}
reader.Stop()

for chunkData := range common.ChunkReader(reader) {
chunk := *chunkSkel
chunk.Data = chunkData
chunksChan <- &chunk
chunkData, err := io.ReadAll(reader)
if err != nil {
return err
}
chunk := *chunkSkel
chunk.Data = chunkData
chunksChan <- &chunk

return nil
}
13 changes: 9 additions & 4 deletions pkg/sources/s3/s3.go
Expand Up @@ -2,6 +2,7 @@ package s3

import (
"fmt"
"io"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -295,11 +296,15 @@ func (s *Source) pageChunker(ctx context.Context, client *s3.S3, chunksChan chan
}
reader.Stop()

for chunkData := range common.ChunkReader(reader) {
chunk := *chunkSkel
chunk.Data = chunkData
chunksChan <- &chunk
chunk := *chunkSkel
chunkData, err := io.ReadAll(reader)
if err != nil {
log.WithError(err).Error("Could not read file data.")
return
}
chunk.Data = chunkData
chunksChan <- &chunk

nErr, ok = errorCount.Load(prefix)
if !ok {
nErr = 0
Expand Down
35 changes: 35 additions & 0 deletions pkg/sources/sources.go
@@ -1,10 +1,15 @@
package sources

import (
"bufio"
"bytes"
"errors"
"io"
"sync"

"google.golang.org/protobuf/types/known/anypb"

"github.com/sirupsen/logrus"
"github.com/trufflesecurity/trufflehog/v3/pkg/common"
"github.com/trufflesecurity/trufflehog/v3/pkg/context"
"github.com/trufflesecurity/trufflehog/v3/pkg/pb/source_metadatapb"
Expand Down Expand Up @@ -143,3 +148,33 @@ func (p *Progress) GetProgress() *Progress {
defer p.mut.Unlock()
return p
}

// Chunker takes a chunk and splits it into chunks of common.ChunkSize.
func Chunker(originalChunk *Chunk) chan *Chunk {
chunkChan := make(chan *Chunk)
go func() {
defer close(chunkChan)
if len(originalChunk.Data) <= common.ChunkSize+common.PeekSize {
chunkChan <- originalChunk
return
}
r := bytes.NewReader(originalChunk.Data)
reader := bufio.NewReaderSize(bufio.NewReader(r), common.ChunkSize)
for {
chunkBytes := make([]byte, common.ChunkSize)
chunk := *originalChunk
n, err := reader.Read(chunkBytes)
if err != nil && !errors.Is(err, io.EOF) {
logrus.WithError(err).Error("Error chunking reader.")
break
}
peekData, _ := reader.Peek(common.PeekSize)
chunk.Data = append(chunkBytes[:n], peekData...)
chunkChan <- &chunk
if errors.Is(err, io.EOF) {
break
}
}
}()
return chunkChan
}

0 comments on commit f5b8a07

Please sign in to comment.