Skip to content

Commit

Permalink
rever: falling edge
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Oct 20, 2022
1 parent 081e428 commit 5e7a207
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 6 deletions.
9 changes: 9 additions & 0 deletions pkg/flipflop/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package flipflop exposes a buffered input functionality
// that mimicks the behavior of falling edge detection
// which is done when doing signal processing on digital
// or analog electric circuitry.
package flipflop
68 changes: 68 additions & 0 deletions pkg/flipflop/falling_edge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package flipflop

import (
"time"
)

type detector struct {
t time.Duration
worstCase time.Duration

buf chan struct{}
out chan struct{}
quit chan struct{}
}

// NewFallingEdge returns a new falling edge detector.
// bufferTime is the time to buffer, worstCase is buffertime*worstcase time to wait before writing
// to the output anyway.
func NewFallingEdge(bufferTime, worstCase time.Duration) (in chan<- struct{}, out <-chan struct{}, clean func()) {
d := &detector{
t: bufferTime,
worstCase: worstCase,
buf: make(chan struct{}, 1),
out: make(chan struct{}),
quit: make(chan struct{}),
}

go d.work()

return d.buf, d.out, func() { close(d.quit) }
}

func (d *detector) work() {
var waitWrite <-chan time.Time
var worstCase <-chan time.Time
for {
select {
case <-d.quit:
return
case <-d.buf:
// we have an item in the buffer, dont announce yet
waitWrite = time.After(d.t)
if worstCase == nil {
worstCase = time.After(d.worstCase)
}
case <-waitWrite:
select {
case d.out <- struct{}{}:
case <-d.quit:
return
}
worstCase = nil
waitWrite = nil
case <-worstCase:
select {
case d.out <- struct{}{}:
case <-d.quit:
return
}
worstCase = nil
waitWrite = nil
}
}
}
122 changes: 122 additions & 0 deletions pkg/flipflop/falling_edge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2021 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package flipflop_test

import (
"testing"
"time"

"github.com/ethersphere/bee/pkg/flipflop"
)

func TestFallingEdge(t *testing.T) {
t.Parallel()
t.Skip("github actions")

ok := make(chan struct{})
tt := 50 * time.Millisecond
worst := 5 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
go func() {
select {
case <-c:
close(ok)
return
case <-time.After(100 * time.Millisecond):
t.Error("timed out")
}
}()

in <- struct{}{}

select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}

func TestFallingEdgeBuffer(t *testing.T) {
t.Parallel()
t.Skip("needs parameter tweaking on github actions")

ok := make(chan struct{})
tt := 150 * time.Millisecond
worst := 9 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
sleeps := 5
wait := 50 * time.Millisecond

start := time.Now()
online := make(chan struct{})
go func() {
close(online)
select {
case <-c:
if time.Since(start) <= 450*time.Millisecond {
t.Errorf("wrote too early %v", time.Since(start))
}
close(ok)
return
case <-time.After(1000 * time.Millisecond):
t.Error("timed out")
}
}()

// wait for goroutine to be scheduled
<-online

for i := 0; i < sleeps; i++ {
in <- struct{}{}
time.Sleep(wait)
}
select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}

func TestFallingEdgeWorstCase(t *testing.T) {
t.Parallel()
t.Skip("github actions")

ok := make(chan struct{})
tt := 100 * time.Millisecond
worst := 5 * tt
in, c, cleanup := flipflop.NewFallingEdge(tt, worst)
defer cleanup()
sleeps := 9
wait := 80 * time.Millisecond

start := time.Now()

go func() {
select {
case <-c:
if time.Since(start) >= 550*time.Millisecond {
t.Errorf("wrote too early %v", time.Since(start))
}

close(ok)
return
case <-time.After(1000 * time.Millisecond):
t.Error("timed out")
}
}()
go func() {
for i := 0; i < sleeps; i++ {
in <- struct{}{}
time.Sleep(wait)
}
}()
select {
case <-ok:
case <-time.After(1 * time.Second):
t.Fatal("timed out")
}
}
5 changes: 5 additions & 0 deletions pkg/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ var (
// Limit the number of goroutines created by Getters
// that call updateGC function. Value 0 sets no limit.
maxParallelUpdateGC = 1000

// values needed to adjust subscription trigger
// buffer time.
flipFlopBufferDuration = 150 * time.Millisecond
flipFlopWorstCaseDuration = 10 * time.Second
)

const (
Expand Down
16 changes: 10 additions & 6 deletions pkg/localstore/subscription_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/pkg/flipflop"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand All @@ -43,16 +44,18 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)

chunkDescriptors := make(chan storage.Descriptor)

trigger := make(chan struct{}, 1)
trigger <- struct{}{}
in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration)

db.pullTriggersMu.Lock()
if _, ok := db.pullTriggers[bin]; !ok {
db.pullTriggers[bin] = make([]chan<- struct{}, 0)
}
db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
db.pullTriggers[bin] = append(db.pullTriggers[bin], in)
db.pullTriggersMu.Unlock()

// send signal for the initial iteration
in <- struct{}{}

stopChan := make(chan struct{})
var stopChanOnce sync.Once

Expand All @@ -62,6 +65,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)

db.subscriptionsWG.Add(1)
go func() {
defer clean()
defer db.subscriptionsWG.Done()
defer db.metrics.SubscribePullStop.Inc()
// close the returned store.Descriptor channel at the end to
Expand All @@ -79,7 +83,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
first := true // first iteration flag for SkipStartFromItem
for {
select {
case <-trigger:
case <-out:
// iterate until:
// - last index Item is reached
// - subscription stop is called
Expand Down Expand Up @@ -167,9 +171,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
defer db.pullTriggersMu.Unlock()

for i, t := range db.pullTriggers[bin] {
if t == trigger {
if t == in {
db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
return
break
}
}
}
Expand Down

0 comments on commit 5e7a207

Please sign in to comment.