Skip to content

Commit

Permalink
Merge branch 'enrfilter' into updatep2p1
Browse files Browse the repository at this point in the history
  • Loading branch information
nibty committed Apr 10, 2024
2 parents 195e32f + 381ed18 commit d8c1849
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 28 deletions.
59 changes: 59 additions & 0 deletions common/gopool/pool.go
@@ -0,0 +1,59 @@
package gopool

import (
"github.com/panjf2000/ants/v2"
"runtime"
"time"
)

var (
// Init a instance pool when importing ants.
defaultPool, _ = ants.NewPool(ants.DefaultAntsPoolSize, ants.WithExpiryDuration(10*time.Second))
minNumberPerTask = 5
)

// Logger is used for logging formatted messages.
type Logger interface {
// Printf must have the same semantics as log.Printf.
Printf(format string, args ...interface{})
}

// Submit submits a task to pool.
func Submit(task func()) error {
return defaultPool.Submit(task)
}

// Running returns the number of the currently running goroutines.
func Running() int {
return defaultPool.Running()
}

// Cap returns the capacity of this default pool.
func Cap() int {
return defaultPool.Cap()
}

// Free returns the available goroutines to work.
func Free() int {
return defaultPool.Free()
}

// Release Closes the default pool.
func Release() {
defaultPool.Release()
}

// Reboot reboots the default pool.
func Reboot() {
defaultPool.Reboot()
}

func Threads(tasks int) int {
threads := tasks / minNumberPerTask
if threads > runtime.NumCPU() {
threads = runtime.NumCPU()
} else if threads == 0 {
threads = 1
}
return threads
}
4 changes: 4 additions & 0 deletions core/forkid/forkid.go
Expand Up @@ -111,6 +111,10 @@ func NewStaticFilter(config *params.ChainConfig, genesis common.Hash) Filter {
return newFilter(config, genesis, head)
}

func NewOperaFilter(config *params.ChainConfig, genesis common.Hash, headfn func() uint64) Filter {
return newFilter(config, genesis, headfn)
}

// newFilter is the internal version of NewFilter, taking closures as its arguments
// instead of a chain. The reason is to allow testing it without having to simulate
// an entire blockchain.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -74,4 +74,5 @@ require (
gopkg.in/urfave/cli.v1 v1.20.0
gopkg.in/yaml.v2 v2.4.0 // indirect
gotest.tools v2.2.0+incompatible // indirect
github.com/panjf2000/ants/v2 v2.4.5
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -335,6 +335,7 @@ github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFSt
github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/panjf2000/ants/v2 v2.4.5/go.mod h1:f6F0NZVFsGCp5A7QW/Zj/m92atWwOkY0OIhFxRNFr4A=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc=
github.com/peterh/liner v1.1.1-0.20190123174540-a2c9a5303de7 h1:oYW+YCJ1pachXTQmzR3rNLYGGz4g/UgFcjb28p/viDM=
Expand Down Expand Up @@ -640,6 +641,7 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
Expand Down
19 changes: 11 additions & 8 deletions p2p/discover/common.go
Expand Up @@ -35,20 +35,23 @@ type UDPConn interface {
LocalAddr() net.Addr
}

type NodeFilterFunc func(*enr.Record) bool

// Config holds settings for the discovery listener.
type Config struct {
// These settings are required and configure the UDP listener:
PrivateKey *ecdsa.PrivateKey

// These settings are optional:
NetRestrict *netutil.Netlist // list of allowed IP networks
IPRestrict []string // list of allowed IP addresses
PrivateNodes []*enode.Node // list of private enodes
Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
Log log.Logger // if set, log messages go here
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
NetRestrict *netutil.Netlist // list of allowed IP networks
IPRestrict []string // list of allowed IP addresses
PrivateNodes []*enode.Node // list of private enodes
Bootnodes []*enode.Node // list of bootstrap nodes
Unhandled chan<- ReadPacket // unhandled packets are sent on this channel
Log log.Logger // if set, log messages go here
ValidSchemes enr.IdentityScheme // allowed identity schemes
Clock mclock.Clock
FilterFunction NodeFilterFunc // function for filtering ENR entries
}

func (cfg Config) withDefaults() Config {
Expand Down
56 changes: 51 additions & 5 deletions p2p/discover/table.go
Expand Up @@ -26,6 +26,7 @@ import (
crand "crypto/rand"
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/common/gopool"
mrand "math/rand"
"net"
"sort"
Expand Down Expand Up @@ -79,6 +80,8 @@ type Table struct {
closeReq chan struct{}
closed chan struct{}

enrFilter NodeFilterFunc

nodeAddedHook func(*node) // for testing
}

Expand All @@ -99,7 +102,7 @@ type bucket struct {
ips netutil.DistinctNetSet
}

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger) (*Table, error) {
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger, filter NodeFilterFunc) (*Table, error) {
tab := &Table{
net: t,
db: db,
Expand All @@ -110,6 +113,7 @@ func newTable(t transport, db *enode.DB, bootnodes []*enode.Node, log log.Logger
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
log: log,
enrFilter: filter,
}
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
Expand Down Expand Up @@ -331,6 +335,12 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
if err != nil {
tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err)
} else {
if tab.enrFilter != nil {
if !tab.enrFilter(n.Record()) {
tab.log.Trace("ENR record filter out", "id", last.ID(), "addr", last.addr())
err = fmt.Errorf("filtered node")
}
}
last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks}
}
}
Expand Down Expand Up @@ -455,16 +465,27 @@ func (tab *Table) bucketAtDistance(d int) *bucket {
return tab.buckets[d-bucketMinDistance-1]
}

// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
gopool.Submit(func() {
tab.addSeenNodeSync(n)
})
}

// addSeenNodeSync adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
func (tab *Table) addSeenNodeSync(n *node) {
if n.ID() == tab.self().ID() {
return
}

if tab.filterNode(n) {
return
}

tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
Expand All @@ -490,7 +511,28 @@ func (tab *Table) addSeenNode(n *node) {
}
}

// addVerifiedNode adds a node whose existence has been verified recently to the front of a
func (tab *Table) filterNode(n *node) bool {
if tab.enrFilter == nil {
return false
}
if node, err := tab.net.RequestENR(unwrapNode(n)); err != nil {
tab.log.Debug("ENR request failed", "id", n.ID(), "addr", n.addr(), "err", err)
return true
} else if !tab.enrFilter(node.Record()) {
tab.log.Trace("ENR record filter out", "id", n.ID(), "addr", n.addr(), "record", n.Record())
return true
}

return false
}

func (tab *Table) addVerifiedNode(n *node) {
gopool.Submit(func() {
tab.addVerifiedNodeSync(n)
})
}

// addVerifiedNodeSync adds a node whose existence has been verified recently to the front of a
// bucket. If the node is already in the bucket, it is moved to the front. If the bucket
// has no space, the node is added to the replacements list.
//
Expand All @@ -499,14 +541,18 @@ func (tab *Table) addSeenNode(n *node) {
// ping repeatedly.
//
// The caller must not hold tab.mutex.
func (tab *Table) addVerifiedNode(n *node) {
func (tab *Table) addVerifiedNodeSync(n *node) {
if !tab.isInitDone() {
return
}
if n.ID() == tab.self().ID() {
return
}

if tab.filterNode(n) {
return
}

tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v4_udp.go
Expand Up @@ -147,7 +147,7 @@ func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) {
log: cfg.Log,
}

tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log, cfg.FilterFunction)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discover/v5_udp.go
Expand Up @@ -169,7 +169,7 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
closeCtx: closeCtx,
cancelCloseCtx: cancelCloseCtx,
}
tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log)
tab, err := newTable(t, t.db, cfg.Bootnodes, cfg.Log, cfg.FilterFunction)
if err != nil {
return nil, err
}
Expand Down
52 changes: 39 additions & 13 deletions p2p/server.go
Expand Up @@ -23,6 +23,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/rlp"
"math/rand"
"net"
"sort"
Expand Down Expand Up @@ -197,6 +199,8 @@ type Server struct {
discmix *enode.FairMix
dialsched *dialScheduler

forkFilter forkid.Filter

// Channels into the run loop.
quit chan struct{}
addtrusted chan *enode.Node
Expand Down Expand Up @@ -593,6 +597,22 @@ func (srv *Server) setupDiscovery() error {
}
srv.localnode.SetFallbackUDP(realaddr.Port)

// ENR filter function
f := func(r *enr.Record) bool {
if srv.forkFilter == nil {
return true
}
var eth struct {
ForkID forkid.ID
Tail []rlp.RawValue `rlp:"tail"`
}
if r.Load(enr.WithEntry("opera", &eth)) != nil {
return false
}

return srv.forkFilter(eth.ForkID) == nil
}

// Discovery V4
var unhandled chan discover.ReadPacket
var sconn *sharedUDPConn
Expand All @@ -602,13 +622,14 @@ func (srv *Server) setupDiscovery() error {
sconn = &sharedUDPConn{conn, unhandled}
}
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
IPRestrict: srv.IPRestrict,
PrivateNodes: srv.PrivateNodes,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
Log: srv.log,
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
IPRestrict: srv.IPRestrict,
PrivateNodes: srv.PrivateNodes,
Bootnodes: srv.BootstrapNodes,
Unhandled: unhandled,
Log: srv.log,
FilterFunction: f,
}
ntab, err := discover.ListenV4(conn, srv.localnode, cfg)
if err != nil {
Expand All @@ -621,12 +642,13 @@ func (srv *Server) setupDiscovery() error {
// Discovery V5
if srv.DiscoveryV5 {
cfg := discover.Config{
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
IPRestrict: srv.IPRestrict,
PrivateNodes: srv.PrivateNodes,
Bootnodes: srv.BootstrapNodesV5,
Log: srv.log,
PrivateKey: srv.PrivateKey,
NetRestrict: srv.NetRestrict,
IPRestrict: srv.IPRestrict,
PrivateNodes: srv.PrivateNodes,
Bootnodes: srv.BootstrapNodesV5,
Log: srv.log,
FilterFunction: f,
}
var err error
if sconn != nil {
Expand Down Expand Up @@ -667,6 +689,10 @@ func (srv *Server) setupDialScheduler() {
}
}

func (srv *Server) SetFilter(f forkid.Filter) {
srv.forkFilter = f
}

func (srv *Server) maxInboundConns() int {
return srv.MaxPeers - srv.maxDialedConns()
}
Expand Down

0 comments on commit d8c1849

Please sign in to comment.