Skip to content

Commit

Permalink
Rate limit blocks submitted to the relay and DB (ethereum#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruteri authored and avalonche committed Mar 9, 2023
1 parent 01d38b5 commit f3227d0
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 19 deletions.
92 changes: 92 additions & 0 deletions builder/block_submission_rate_limiter.go
@@ -0,0 +1,92 @@
package builder

import (
"context"
"sync/atomic"
"time"

"github.com/ethereum/go-ethereum/core/types"
)

type blockRateLimitSubmission struct {
resultCh chan bool
block *types.Block
}

type BlockSubmissionRateLimiter struct {
submissionsCh chan blockRateLimitSubmission
started uint32
ctx context.Context
cancel context.CancelFunc
}

func NewBlockSubmissionRateLimiter() *BlockSubmissionRateLimiter {
ctx, cancel := context.WithCancel(context.Background())
r := &BlockSubmissionRateLimiter{
submissionsCh: make(chan blockRateLimitSubmission),
started: uint32(0),
ctx: ctx,
cancel: cancel,
}

return r
}
func (r *BlockSubmissionRateLimiter) Limit(block *types.Block) chan bool {
resultCh := make(chan bool, 1)
if atomic.LoadUint32(&r.started) != 1 {
resultCh <- true
return resultCh
}

select {
case r.submissionsCh <- blockRateLimitSubmission{
resultCh: resultCh,
block: block,
}:
case <-r.ctx.Done():
resultCh <- true
}
return resultCh
}

func (r *BlockSubmissionRateLimiter) Start() {
if !atomic.CompareAndSwapUint32(&r.started, 0, 1) {
return
}

go r.rateLimit()
}

func (r *BlockSubmissionRateLimiter) rateLimit() {
for r.ctx.Err() == nil {
// Beginning of the rate limit bucket
bestSubmission := <-r.submissionsCh

bucketCutoffCh := time.After(100 * time.Millisecond)

bucketClosed := false
for !bucketClosed {
select {
case <-r.ctx.Done():
bucketClosed = true
break
case <-bucketCutoffCh:
bucketClosed = true
break
case newSubmission := <-r.submissionsCh:
if bestSubmission.block.Profit.Cmp(newSubmission.block.Profit) < 0 {
bestSubmission.resultCh <- false
bestSubmission = newSubmission
} else {
newSubmission.resultCh <- false
}
}
}

bestSubmission.resultCh <- true
}
}

func (r *BlockSubmissionRateLimiter) Stop() {
r.cancel()
}
72 changes: 72 additions & 0 deletions builder/block_submission_rate_limiter_test.go
@@ -0,0 +1,72 @@
package builder

import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)

func TestLimit(t *testing.T) {
rl := NewBlockSubmissionRateLimiter()

// Check that before starting requests are passed through
ch1 := rl.Limit(&types.Block{Profit: new(big.Int)})
ch2 := rl.Limit(&types.Block{Profit: new(big.Int)})
ch3 := rl.Limit(&types.Block{Profit: new(big.Int)})

time.Sleep(200 * time.Millisecond)

for _, ch := range []chan bool{ch1, ch2, ch3} {
select {
case shouldSubmit := <-ch:
require.True(t, shouldSubmit)
default:
t.Error("chan was not ready")
}
}

// Check that after starting requests are rate limited
rl.Start()

// Check that before starting requests are passed through
ch1 = rl.Limit(&types.Block{Profit: new(big.Int)})
ch2 = rl.Limit(&types.Block{Profit: new(big.Int)})
ch3 = rl.Limit(&types.Block{Profit: big.NewInt(1)})

time.Sleep(200 * time.Millisecond)

for _, ch := range []chan bool{ch1, ch2, ch3} {
select {
case shouldSubmit := <-ch:
if ch == ch3 {
require.True(t, shouldSubmit)
} else {
require.False(t, shouldSubmit)
}
default:
t.Error("chan was not ready")
}
}

// Check that after stopping requests are passed through
rl.Stop()

ch1 = rl.Limit(&types.Block{Profit: new(big.Int)})
ch2 = rl.Limit(&types.Block{Profit: new(big.Int)})
ch3 = rl.Limit(&types.Block{Profit: new(big.Int)})

time.Sleep(200 * time.Millisecond)

for _, ch := range []chan bool{ch1, ch2, ch3} {
select {
case shouldSubmit := <-ch:
require.True(t, shouldSubmit)
default:
t.Error("chan was not ready")
}
}

}
55 changes: 41 additions & 14 deletions builder/builder.go
Expand Up @@ -38,14 +38,17 @@ type IRelay interface {

type IBuilder interface {
OnPayloadAttribute(attrs *BuilderPayloadAttributes) error
Start() error
Stop() error
}

type Builder struct {
ds IDatabaseService
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
resubmitter Resubmitter
ds IDatabaseService
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
resubmitter Resubmitter
blockSubmissionRateLimiter *BlockSubmissionRateLimiter

builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
Expand All @@ -62,19 +65,30 @@ func NewBuilder(sk *bls.SecretKey, ds IDatabaseService, bc IBeaconClient, relay
pk.FromSlice(pkBytes)

return &Builder{
ds: ds,
beaconClient: bc,
relay: relay,
eth: eth,
resubmitter: Resubmitter{},
builderSecretKey: sk,
builderPublicKey: pk,
ds: ds,
beaconClient: bc,
relay: relay,
eth: eth,
resubmitter: Resubmitter{},
blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(),
builderSecretKey: sk,
builderPublicKey: pk,

builderSigningDomain: builderSigningDomain,
bestBlockProfit: big.NewInt(0),
}
}

func (b *Builder) Start() error {
b.blockSubmissionRateLimiter.Start()
return nil
}

func (b *Builder) Stop() error {
b.blockSubmissionRateLimiter.Stop()
return nil
}

func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error {
b.bestMu.Lock()
defer b.bestMu.Unlock()
Expand Down Expand Up @@ -116,6 +130,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun
Value: *value,
}

go b.ds.ConsumeBuiltBlock(block, bundles, &blockBidMsg)

signature, err := boostTypes.SignMessage(&blockBidMsg, b.builderSigningDomain, b.builderSecretKey)
if err != nil {
log.Error("could not sign builder bid", "err", err)
Expand All @@ -134,9 +150,9 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun
return err
}

b.bestBlockProfit.Set(block.Profit)
log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg)

go b.ds.ConsumeBuiltBlock(block, bundles, &blockBidMsg)
b.bestBlockProfit.Set(block.Profit)
return nil
}

Expand Down Expand Up @@ -171,13 +187,24 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error {
}

blockHook := func(block *types.Block, bundles []types.SimulatedBundle) {
select {
case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block):
if !shouldSubmit {
log.Info("Block rate limited", "blochHash", block.Hash())
return
}
case <-time.After(200 * time.Millisecond):
log.Info("Block rate limit timeout, submitting the block anyway")
}

err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs)
if err != nil {
log.Error("could not run sealed block hook", "err", err)
}
}

firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error {
log.Info("Resubmitting build job")
return b.eth.BuildBlock(attrs, blockHook)
})

Expand Down
5 changes: 4 additions & 1 deletion builder/builder_test.go
Expand Up @@ -75,8 +75,11 @@ func TestOnPayloadAttributes(t *testing.T) {
testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock}

builder := NewBuilder(sk, NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService)
builder.Start()
defer builder.Stop()

builder.OnPayloadAttribute(testPayloadAttributes)
err = builder.OnPayloadAttribute(testPayloadAttributes)
require.NoError(t, err)

require.NotNil(t, testRelay.submittedMsg)
expectedProposerPubkey, err := boostTypes.HexToPubkey(testBeacon.validator.Pk.String())
Expand Down
2 changes: 0 additions & 2 deletions builder/relay.go
Expand Up @@ -150,8 +150,6 @@ func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest) err
return fmt.Errorf("non-ok response code %d from relay ", code)
}

log.Info("submitted block", "msg", msg)

if r.localRelay != nil {
r.localRelay.SubmitBlock(msg)
}
Expand Down
18 changes: 16 additions & 2 deletions builder/service.go
Expand Up @@ -41,11 +41,23 @@ type Service struct {
builder IBuilder
}

func (s *Service) Start() {
func (s *Service) Start() error {
if s.srv != nil {
log.Info("Service started")
go s.srv.ListenAndServe()
}

s.builder.Start()

return nil
}

func (s *Service) Stop() error {
if s.srv != nil {
s.srv.Close()
}
s.builder.Stop()
return nil
}

func (s *Service) PayloadAttributes(payloadAttributes *BuilderPayloadAttributes) error {
Expand Down Expand Up @@ -170,7 +182,6 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error

builderBackend := NewBuilder(builderSk, ds, beaconClient, relay, builderSigningDomain, ethereumService)
builderService := NewService(cfg.ListenAddr, localRelay, builderBackend)
builderService.Start()

stack.RegisterAPIs([]rpc.API{
{
Expand All @@ -181,5 +192,8 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error
Authenticated: true,
},
})

stack.RegisterLifecycle(builderService)

return nil
}

0 comments on commit f3227d0

Please sign in to comment.