Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: msgIdGenerator #468

Merged
merged 10 commits into from Jan 23, 2022
8 changes: 4 additions & 4 deletions gossip_tracer.go
Expand Up @@ -15,7 +15,7 @@ import (
type gossipTracer struct {
sync.Mutex

msgID MsgIdFunction
idGen *msgIDGenerator

followUpTime time.Duration

Expand All @@ -29,7 +29,7 @@ type gossipTracer struct {

func newGossipTracer() *gossipTracer {
return &gossipTracer{
msgID: DefaultMsgIdFn,
idGen: newMsgIdGenerator(),
vyzo marked this conversation as resolved.
Show resolved Hide resolved
promises: make(map[string]map[peer.ID]time.Time),
peerPromises: make(map[peer.ID]map[string]struct{}),
}
Expand All @@ -40,7 +40,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) {
return
}

gt.msgID = gs.p.msgID
gt.idGen = gs.p.idGen
gt.followUpTime = gs.params.IWantFollowupTime
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int {
var _ RawTracer = (*gossipTracer)(nil)

func (gt *gossipTracer) fulfillPromise(msg *Message) {
mid := gt.msgID(msg.Message)
mid := gt.idGen.ID(msg)

gt.Lock()
defer gt.Unlock()
Expand Down
8 changes: 4 additions & 4 deletions gossipsub.go
Expand Up @@ -295,7 +295,7 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt
ps.tracer = &pubsubTracer{
raw: []RawTracer{gs.score, gs.gossipTracer},
pid: ps.host.ID(),
msgID: ps.msgID,
idGen: ps.idGen,
}
}

Expand Down Expand Up @@ -484,7 +484,7 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
gs.tagTracer.Start(gs)

// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.msgID)
gs.mcache.SetMsgIdFn(p.idGen.ID)

// start the heartbeat
go gs.heartbeatTimer()
Expand Down Expand Up @@ -705,7 +705,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb.
continue
}

ihave[mid] = msg
ihave[mid] = msg.Message
}
}

Expand Down Expand Up @@ -954,7 +954,7 @@ func (gs *GossipSubRouter) connector() {
}

func (gs *GossipSubRouter) Publish(msg *Message) {
gs.mcache.Put(msg.Message)
gs.mcache.Put(msg)

from := msg.ReceivedFrom
topic := msg.GetTopic()
Expand Down
20 changes: 10 additions & 10 deletions mcache.go
Expand Up @@ -3,8 +3,6 @@ package pubsub
import (
"fmt"

pb "github.com/libp2p/go-libp2p-pubsub/pb"

"github.com/libp2p/go-libp2p-core/peer"
)

Expand All @@ -27,23 +25,25 @@ func NewMessageCache(gossip, history int) *MessageCache {
panic(err)
}
return &MessageCache{
msgs: make(map[string]*pb.Message),
msgs: make(map[string]*Message),
peertx: make(map[string]map[peer.ID]int),
history: make([][]CacheEntry, history),
gossip: gossip,
msgID: DefaultMsgIdFn,
msgID: func(msg *Message) string {
return DefaultMsgIdFn(msg.Message)
},
}
}

type MessageCache struct {
msgs map[string]*pb.Message
msgs map[string]*Message
peertx map[string]map[peer.ID]int
history [][]CacheEntry
gossip int
msgID MsgIdFunction
msgID func(*Message) string
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}

func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction) {
func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string) {
vyzo marked this conversation as resolved.
Show resolved Hide resolved
mc.msgID = msgID
}

Expand All @@ -52,18 +52,18 @@ type CacheEntry struct {
topic string
}

func (mc *MessageCache) Put(msg *pb.Message) {
func (mc *MessageCache) Put(msg *Message) {
mid := mc.msgID(msg)
mc.msgs[mid] = msg
mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()})
}

func (mc *MessageCache) Get(mid string) (*pb.Message, bool) {
func (mc *MessageCache) Get(mid string) (*Message, bool) {
m, ok := mc.msgs[mid]
return m, ok
}

func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*pb.Message, int, bool) {
func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool) {
m, ok := mc.msgs[mid]
if !ok {
return nil, 0, false
Expand Down
18 changes: 9 additions & 9 deletions mcache_test.go
Expand Up @@ -18,7 +18,7 @@ func TestMessageCache(t *testing.T) {
}

for i := 0; i < 10; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
vyzo marked this conversation as resolved.
Show resolved Hide resolved
}

for i := 0; i < 10; i++ {
Expand All @@ -28,7 +28,7 @@ func TestMessageCache(t *testing.T) {
t.Fatalf("Message %d not in cache", i)
}

if m != msgs[i] {
if m.Message != msgs[i] {
t.Fatalf("Message %d does not match cache", i)
}
}
Expand All @@ -47,7 +47,7 @@ func TestMessageCache(t *testing.T) {

mcache.Shift()
for i := 10; i < 20; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
}

for i := 0; i < 20; i++ {
Expand All @@ -57,7 +57,7 @@ func TestMessageCache(t *testing.T) {
t.Fatalf("Message %d not in cache", i)
}

if m != msgs[i] {
if m.Message != msgs[i] {
t.Fatalf("Message %d does not match cache", i)
}
}
Expand All @@ -83,22 +83,22 @@ func TestMessageCache(t *testing.T) {

mcache.Shift()
for i := 20; i < 30; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
}

mcache.Shift()
for i := 30; i < 40; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
}

mcache.Shift()
for i := 40; i < 50; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
}

mcache.Shift()
for i := 50; i < 60; i++ {
mcache.Put(msgs[i])
mcache.Put(&Message{Message: msgs[i]})
}

if len(mcache.msgs) != 50 {
Expand All @@ -120,7 +120,7 @@ func TestMessageCache(t *testing.T) {
t.Fatalf("Message %d not in cache", i)
}

if m != msgs[i] {
if m.Message != msgs[i] {
t.Fatalf("Message %d does not match cache", i)
}
}
Expand Down
52 changes: 52 additions & 0 deletions midgen.go
@@ -0,0 +1,52 @@
package pubsub

import (
"sync"

pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

// msgIDGenerator handles computing IDs for msgs
// It allows setting custom generators(MsgIdFunction) per topic
type msgIDGenerator struct {
Default MsgIdFunction

topicGensLk sync.RWMutex
vyzo marked this conversation as resolved.
Show resolved Hide resolved
topicGens map[string]MsgIdFunction
}

func newMsgIdGenerator() *msgIDGenerator {
return &msgIDGenerator{
Default: DefaultMsgIdFn,
topicGens: make(map[string]MsgIdFunction),
}
}

// Set sets custom id generator(MsgIdFunction) for topic.
func (m *msgIDGenerator) Set(topic string, gen MsgIdFunction) {
m.topicGensLk.Lock()
m.topicGens[topic] = gen
m.topicGensLk.Unlock()
}

// ID computes ID for the msg or short-circuits with the cached value.
func (m *msgIDGenerator) ID(msg *Message) string {
if msg.ID != "" {
return msg.ID
}

msg.ID = m.RawID(msg.Message)
return msg.ID
}

// RawID computes ID for the proto 'msg'.
func (m *msgIDGenerator) RawID(msg *pb.Message) string {
m.topicGensLk.RLock()
gen, ok := m.topicGens[msg.GetTopic()]
m.topicGensLk.RUnlock()
if !ok {
gen = m.Default
}

return gen(msg)
}
2 changes: 1 addition & 1 deletion peer_gater.go
Expand Up @@ -182,7 +182,7 @@ func WithPeerGater(params *PeerGaterParams) Option {
ps.tracer = &pubsubTracer{
raw: []RawTracer{gs.gate},
pid: ps.host.ID(),
msgID: ps.msgID,
idGen: ps.idGen,
}
}

Expand Down
32 changes: 18 additions & 14 deletions pubsub.go
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"

logging "github.com/ipfs/go-log"
timecache "github.com/whyrusleeping/timecache"
"github.com/whyrusleeping/timecache"
)

// DefaultMaximumMessageSize is 1mb.
Expand Down Expand Up @@ -147,8 +147,8 @@ type PubSub struct {
seenMessagesMx sync.Mutex
seenMessages *timecache.TimeCache

// function used to compute the ID for a message
msgID MsgIdFunction
// generator used to compute the ID for a message
idGen *msgIDGenerator

// key for signing messages; nil when signing is disabled
signKey crypto.PrivKey
Expand Down Expand Up @@ -213,6 +213,7 @@ const (

type Message struct {
*pb.Message
ID string
ReceivedFrom peer.ID
ValidatorData interface{}
}
Expand Down Expand Up @@ -272,7 +273,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
blacklist: NewMapBlacklist(),
blacklistPeer: make(chan peer.ID),
seenMessages: timecache.NewTimeCache(TimeCacheDuration),
msgID: DefaultMsgIdFn,
idGen: newMsgIdGenerator(),
counter: uint64(time.Now().UnixNano()),
}

Expand Down Expand Up @@ -326,11 +327,7 @@ type MsgIdFunction func(pmsg *pb.Message) string
// but it can be customized to e.g. the hash of the message.
func WithMessageIdFn(fn MsgIdFunction) Option {
return func(p *PubSub) error {
p.msgID = fn
// the tracer Option may already be set. Update its message ID function to make options order-independent.
if p.tracer != nil {
p.tracer.msgID = fn
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
p.idGen.Default = fn
return nil
}
}
Expand Down Expand Up @@ -455,7 +452,7 @@ func WithEventTracer(tracer EventTracer) Option {
if p.tracer != nil {
p.tracer.tracer = tracer
} else {
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), msgID: p.msgID}
p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), idGen: p.idGen}
}
return nil
}
Expand All @@ -468,7 +465,7 @@ func WithRawTracer(tracer RawTracer) Option {
if p.tracer != nil {
p.tracer.raw = append(p.tracer.raw, tracer)
} else {
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), idGen: p.idGen}
}
return nil
}
Expand Down Expand Up @@ -1047,8 +1044,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
continue
}

msg := &Message{pmsg, rpc.from, nil}
p.pushMsg(msg)
p.pushMsg(&Message{pmsg, "", rpc.from, nil})
}
}

Expand Down Expand Up @@ -1097,7 +1093,7 @@ func (p *PubSub) pushMsg(msg *Message) {
}

// have we already seen and validated this message?
id := p.msgID(msg.Message)
id := p.idGen.ID(msg)
if p.seenMessage(id) {
p.tracer.DuplicateMessage(msg)
return
Expand Down Expand Up @@ -1164,6 +1160,14 @@ type TopicOptions struct{}

type TopicOpt func(t *Topic) error

// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules.
func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt {
return func(t *Topic) error {
t.p.idGen.Set(t.topic, msgId)
return nil
}
}

// Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if
// the Topic handle already exists.
func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) {
Expand Down