Skip to content

Commit

Permalink
Fix batch operation issues
Browse files Browse the repository at this point in the history
Signed-off-by: Kemal Akkoyun <kakkoyun@gmail.com>
  • Loading branch information
kakkoyun committed May 3, 2022
1 parent cce312b commit f1949c2
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 37 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ Dockerfile
/images
/.github
/tmp
*.bpf.o
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ bpf_compile_tools = $(CMD_LLC) $(CMD_CLANG)
.PHONY: $(bpf_compile_tools)
$(bpf_compile_tools): % : check_%

# TODO(kakkoyun): To prevent out of sync libbpf dependency, we nmight want to try directly linking/updating the submodule in the libbpf-go.
# - Determining the location of the go module cache dir and initializing the submodule in there and linking in here, should be doable.
$(LIBBPF_SRC):
test -d $(LIBBPF_SRC) || (echo "missing libbpf source - maybe do 'git submodule init && git submodule update'" ; false)

Expand Down
2 changes: 1 addition & 1 deletion deploy/dev.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ function(serverVersion='v0.4.2')
version: serverVersion,
replicas: 1,
corsAllowedOrigins: '*',
logLevel: 'debug',
// logLevel: 'debug',
});

local agent = (import 'parca-agent/parca-agent.libsonnet')({
Expand Down
2 changes: 1 addition & 1 deletion deploy/tilt/parca-server-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ spec:
- args:
- /parca
- --config-path=/var/parca/parca.yaml
- --log-level=debug
- --log-level=info
- --cors-allowed-origins=*
image: ghcr.io/parca-dev/parca:v0.10.0
livenessProbe:
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/write_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (b *Batcher) batchLoop(ctx context.Context) error {
}

if len(batch) > 0 {
level.Debug(b.logger).Log("msg", "batch write client sent profiles", "count", len(batch))
level.Debug(b.logger).Log("msg", "batch write client has sent profiles", "count", len(batch))
}
return nil
}
Expand Down
99 changes: 65 additions & 34 deletions pkg/profiler/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ type stackCountKey struct {
}

type bpfMaps struct {
counts *bpf.BPFMap
stackTraces *bpf.BPFMap
countKeys []stackCountKey
counts *bpf.BPFMap
traces *bpf.BPFMap
}

type metrics struct {
Expand Down Expand Up @@ -140,6 +139,7 @@ type CgroupProfiler struct {

bpfMaps *bpfMaps
byteOrder binary.ByteOrder
countKeys []stackCountKey

lastError error
lastProfileTakenAt time.Time
Expand Down Expand Up @@ -307,20 +307,20 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
return fmt.Errorf("get counts map: %w", err)
}

traces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, traces: traces}

// Allocate this here, so it's only allocated once instead of every
// time that p.profileLoop is called below. This is because, as of now,
// this slice will be around 122Kb. We allocate enough to read the entire
// map instead of using the batch iteration feature because it vastly
// simplifies the code in profileLoop and the batch operations are a bit tricky to get right.
// If allocating this much memory upfront is a problem we can always revisit and use
// smaller batch sizes.
countKeys := make([]stackCountKey, counts.GetMaxEntries())

stackTraces, err := m.GetMap("stack_traces")
if err != nil {
return fmt.Errorf("get stack traces map: %w", err)
}
p.bpfMaps = &bpfMaps{counts: counts, countKeys: countKeys, stackTraces: stackTraces}
p.countKeys = make([]stackCountKey, counts.GetMaxEntries())

ticker := time.NewTicker(p.profilingDuration)
defer ticker.Stop()
Expand All @@ -343,7 +343,15 @@ func (p *CgroupProfiler) Run(ctx context.Context) error {
}
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) (err error) {
func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time) error {
var (
mappings = maps.NewMapping(p.pidMappingFileCache)
kernelMapping = &profile.Mapping{
Expand All @@ -357,23 +365,54 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
locationIndices = map[[2]uint64]int{} // [PID, Address] -> index in locations

// Variables needed for eBPF map batch iteration.
keysPtr = unsafe.Pointer(&p.bpfMaps.countKeys[0])
nextKey = uintptr(1)
countKeysPtr = unsafe.Pointer(&p.countKeys[0])
nextCountKey = uintptr(1)
)

memsetCountKeys(p.bpfMaps.countKeys, stackCountKey{})
// Reset count keys before collecting new traces from the kernel.
memsetCountKeys(p.countKeys, stackCountKey{})

counts := p.bpfMaps.counts
vals, err := counts.GetValueAndDeleteBatch(keysPtr, nil, unsafe.Pointer(&nextKey), counts.GetMaxEntries())
batchSize := 0
it := p.bpfMaps.counts.Iterator()
for it.Next() {
batchSize++
}
if err := it.Err(); err != nil {
return fmt.Errorf("iterate over counts map: %w", err)
}

if batchSize == 0 {
return nil
}
level.Debug(p.logger).Log("msg", "fetching stack trace counts in batch", "batchSize", batchSize)
time.Sleep(1 * time.Second)

var (
values [][]byte
err error
)
values, err = p.bpfMaps.counts.GetValueAndDeleteBatch(countKeysPtr, nil, unsafe.Pointer(&nextCountKey), uint32(batchSize))
if err != nil {
// TODO(kakkoyun): Check if errors are correct!
if !errors.Is(err, syscall.ENOENT) { // Map is empty or we got all keys in the last batch.
return err
switch {
case errors.Is(err, syscall.EPERM):
level.Error(p.logger).Log("msg", "get value and delete batch: requested number of items is probably greater than existed", "err", err)
// return fmt.Errorf("get value and delete batch: requested number of items is probably greater than existed: %w", err)
return nil

case errors.Is(err, syscall.ENOENT):
level.Debug(p.logger).Log("msg", "no values in batch")
return nil

default:
return fmt.Errorf("get value and delete batch: %w", err)
}
}
if len(values) == 0 {
level.Debug(p.logger).Log("msg", "no values in batch")
return nil
}

// keys := p.bpfMaps.counts
for i, key := range p.bpfMaps.countKeys {
for i, key := range p.countKeys {
var (
pid = key.PID
userStackID = key.UserStackID
Expand Down Expand Up @@ -406,7 +445,7 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
continue
}

value := p.byteOrder.Uint64(vals[i])
value := p.byteOrder.Uint64(values[i])
if value == 0 {
// This should never happen, but it's here just in case.
// If we have a zero value, we don't want to add it to the profile.
Expand Down Expand Up @@ -492,14 +531,6 @@ func (p *CgroupProfiler) profileLoop(ctx context.Context, captureTime time.Time)
return nil
}

func (p *CgroupProfiler) loopReport(lastProfileTakenAt time.Time, lastError error) {
p.mtx.Lock()
defer p.mtx.Unlock()

p.lastProfileTakenAt = lastProfileTakenAt
p.lastError = lastError
}

func (p *CgroupProfiler) buildProfile(
ctx context.Context,
captureTime time.Time,
Expand Down Expand Up @@ -645,7 +676,7 @@ func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack)
return errors.New("user stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&userStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&userStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("user").Inc()
return fmt.Errorf("read user stack trace: %w", err)
Expand All @@ -655,7 +686,7 @@ func (p *CgroupProfiler) getAndDeleteUserStack(userStackID int32, stack *stack)
return fmt.Errorf("read user stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.stackTraces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&userStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}

Expand All @@ -669,7 +700,7 @@ func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *sta
return errors.New("kernel stack ID is 0, probably stack unwinding failed")
}

stackBytes, err := p.bpfMaps.stackTraces.GetValue(unsafe.Pointer(&kernelStackID))
stackBytes, err := p.bpfMaps.traces.GetValue(unsafe.Pointer(&kernelStackID))
if err != nil {
p.metrics.missingStacks.WithLabelValues("kernel").Inc()
return fmt.Errorf("read kernel stack trace: %w", err)
Expand All @@ -679,7 +710,7 @@ func (p *CgroupProfiler) getAndDeleteKernelStack(kernelStackID int32, stack *sta
return fmt.Errorf("read kernel stack bytes, %s: %w", err, errUnrecoverable)
}

if err := p.bpfMaps.stackTraces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
if err := p.bpfMaps.traces.DeleteKey(unsafe.Pointer(&kernelStackID)); err != nil {
return fmt.Errorf("unable to delete stack trace key: %w", err)
}
return nil
Expand Down

0 comments on commit f1949c2

Please sign in to comment.