From da2f3edf543c1e13d63ca12a5a9a504fa76899ba Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 9 Jan 2022 18:05:29 +0200 Subject: [PATCH 01/10] feat: introduce msgIdGenerator and add ID field to Message wrapper --- midgen.go | 32 ++++++++++++++++++++++++++++++++ pubsub.go | 4 ++-- topic.go | 2 +- 3 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 midgen.go diff --git a/midgen.go b/midgen.go new file mode 100644 index 00000000..03293294 --- /dev/null +++ b/midgen.go @@ -0,0 +1,32 @@ +package pubsub + +import "sync" + +type msgIDGenerator struct { + defGen MsgIdFunction + + topicGens map[string]MsgIdFunction + topicGensLk sync.RWMutex +} + +func (m *msgIDGenerator) Add(topic string, gen MsgIdFunction) { + m.topicGensLk.Lock() + m.topicGens[topic] = gen + m.topicGensLk.Unlock() +} + +func (m *msgIDGenerator) GenID(msg *Message) string { + if msg.ID != "" { + return msg.ID + } + + m.topicGensLk.RLock() + gen, ok := m.topicGens[msg.GetTopic()] + m.topicGensLk.RUnlock() + if !ok { + gen = m.defGen + } + + msg.ID = gen(msg.Message) + return msg.ID +} diff --git a/pubsub.go b/pubsub.go index fdfa755b..cba16c59 100644 --- a/pubsub.go +++ b/pubsub.go @@ -213,6 +213,7 @@ const ( type Message struct { *pb.Message + ID string ReceivedFrom peer.ID ValidatorData interface{} } @@ -1047,8 +1048,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) { continue } - msg := &Message{pmsg, rpc.from, nil} - p.pushMsg(msg) + p.pushMsg(&Message{pmsg, "", rpc.from, nil}) } } diff --git a/topic.go b/topic.go index 8de88c3c..edec94ff 100644 --- a/topic.go +++ b/topic.go @@ -283,7 +283,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error } } - return t.p.val.PushLocal(&Message{m, t.p.host.ID(), nil}) + return t.p.val.PushLocal(&Message{m, "", t.p.host.ID(), nil}) } // WithReadiness returns a publishing option for only publishing when the router is ready. From 83d82d31c27483856d240a31a0c3f60fee0ef66b Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 9 Jan 2022 18:14:39 +0200 Subject: [PATCH 02/10] feat: integrate msgIdGenerator --- gossip_tracer.go | 8 ++++---- gossipsub.go | 8 ++++---- mcache.go | 20 ++++++++++---------- mcache_test.go | 18 +++++++++--------- midgen.go | 23 ++++++++++++++++++----- peer_gater.go | 2 +- pubsub.go | 20 ++++++++------------ score.go | 14 +++++++------- tag_tracer.go | 20 ++++++++++---------- trace.go | 12 ++++++------ validation.go | 2 +- 11 files changed, 78 insertions(+), 69 deletions(-) diff --git a/gossip_tracer.go b/gossip_tracer.go index 04e77193..a40b707c 100644 --- a/gossip_tracer.go +++ b/gossip_tracer.go @@ -15,7 +15,7 @@ import ( type gossipTracer struct { sync.Mutex - msgID MsgIdFunction + idGen *msgIDGenerator followUpTime time.Duration @@ -29,7 +29,7 @@ type gossipTracer struct { func newGossipTracer() *gossipTracer { return &gossipTracer{ - msgID: DefaultMsgIdFn, + idGen: newMsgIdGenerator(), promises: make(map[string]map[peer.ID]time.Time), peerPromises: make(map[peer.ID]map[string]struct{}), } @@ -40,7 +40,7 @@ func (gt *gossipTracer) Start(gs *GossipSubRouter) { return } - gt.msgID = gs.p.msgID + gt.idGen = gs.p.idGen gt.followUpTime = gs.params.IWantFollowupTime } @@ -117,7 +117,7 @@ func (gt *gossipTracer) GetBrokenPromises() map[peer.ID]int { var _ RawTracer = (*gossipTracer)(nil) func (gt *gossipTracer) fulfillPromise(msg *Message) { - mid := gt.msgID(msg.Message) + mid := gt.idGen.ID(msg) gt.Lock() defer gt.Unlock() diff --git a/gossipsub.go b/gossipsub.go index 7c2da306..0aa8a007 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -295,7 +295,7 @@ func WithPeerScore(params *PeerScoreParams, thresholds *PeerScoreThresholds) Opt ps.tracer = &pubsubTracer{ raw: []RawTracer{gs.score, gs.gossipTracer}, pid: ps.host.ID(), - msgID: ps.msgID, + idGen: ps.idGen, } } @@ -484,7 +484,7 @@ func (gs *GossipSubRouter) Attach(p *PubSub) { gs.tagTracer.Start(gs) // start using the same msg ID function as PubSub for caching messages. - gs.mcache.SetMsgIdFn(p.msgID) + gs.mcache.SetMsgIdFn(p.idGen.ID) // start the heartbeat go gs.heartbeatTimer() @@ -705,7 +705,7 @@ func (gs *GossipSubRouter) handleIWant(p peer.ID, ctl *pb.ControlMessage) []*pb. continue } - ihave[mid] = msg + ihave[mid] = msg.Message } } @@ -954,7 +954,7 @@ func (gs *GossipSubRouter) connector() { } func (gs *GossipSubRouter) Publish(msg *Message) { - gs.mcache.Put(msg.Message) + gs.mcache.Put(msg) from := msg.ReceivedFrom topic := msg.GetTopic() diff --git a/mcache.go b/mcache.go index e1f02ab3..889948eb 100644 --- a/mcache.go +++ b/mcache.go @@ -3,8 +3,6 @@ package pubsub import ( "fmt" - pb "github.com/libp2p/go-libp2p-pubsub/pb" - "github.com/libp2p/go-libp2p-core/peer" ) @@ -27,23 +25,25 @@ func NewMessageCache(gossip, history int) *MessageCache { panic(err) } return &MessageCache{ - msgs: make(map[string]*pb.Message), + msgs: make(map[string]*Message), peertx: make(map[string]map[peer.ID]int), history: make([][]CacheEntry, history), gossip: gossip, - msgID: DefaultMsgIdFn, + msgID: func(msg *Message) string { + return DefaultMsgIdFn(msg.Message) + }, } } type MessageCache struct { - msgs map[string]*pb.Message + msgs map[string]*Message peertx map[string]map[peer.ID]int history [][]CacheEntry gossip int - msgID MsgIdFunction + msgID func(*Message) string } -func (mc *MessageCache) SetMsgIdFn(msgID MsgIdFunction) { +func (mc *MessageCache) SetMsgIdFn(msgID func(*Message) string) { mc.msgID = msgID } @@ -52,18 +52,18 @@ type CacheEntry struct { topic string } -func (mc *MessageCache) Put(msg *pb.Message) { +func (mc *MessageCache) Put(msg *Message) { mid := mc.msgID(msg) mc.msgs[mid] = msg mc.history[0] = append(mc.history[0], CacheEntry{mid: mid, topic: msg.GetTopic()}) } -func (mc *MessageCache) Get(mid string) (*pb.Message, bool) { +func (mc *MessageCache) Get(mid string) (*Message, bool) { m, ok := mc.msgs[mid] return m, ok } -func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*pb.Message, int, bool) { +func (mc *MessageCache) GetForPeer(mid string, p peer.ID) (*Message, int, bool) { m, ok := mc.msgs[mid] if !ok { return nil, 0, false diff --git a/mcache_test.go b/mcache_test.go index e36c6b19..93bcfdc6 100644 --- a/mcache_test.go +++ b/mcache_test.go @@ -18,7 +18,7 @@ func TestMessageCache(t *testing.T) { } for i := 0; i < 10; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } for i := 0; i < 10; i++ { @@ -28,7 +28,7 @@ func TestMessageCache(t *testing.T) { t.Fatalf("Message %d not in cache", i) } - if m != msgs[i] { + if m.Message != msgs[i] { t.Fatalf("Message %d does not match cache", i) } } @@ -47,7 +47,7 @@ func TestMessageCache(t *testing.T) { mcache.Shift() for i := 10; i < 20; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } for i := 0; i < 20; i++ { @@ -57,7 +57,7 @@ func TestMessageCache(t *testing.T) { t.Fatalf("Message %d not in cache", i) } - if m != msgs[i] { + if m.Message != msgs[i] { t.Fatalf("Message %d does not match cache", i) } } @@ -83,22 +83,22 @@ func TestMessageCache(t *testing.T) { mcache.Shift() for i := 20; i < 30; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } mcache.Shift() for i := 30; i < 40; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } mcache.Shift() for i := 40; i < 50; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } mcache.Shift() for i := 50; i < 60; i++ { - mcache.Put(msgs[i]) + mcache.Put(&Message{Message: msgs[i]}) } if len(mcache.msgs) != 50 { @@ -120,7 +120,7 @@ func TestMessageCache(t *testing.T) { t.Fatalf("Message %d not in cache", i) } - if m != msgs[i] { + if m.Message != msgs[i] { t.Fatalf("Message %d does not match cache", i) } } diff --git a/midgen.go b/midgen.go index 03293294..d09c87d6 100644 --- a/midgen.go +++ b/midgen.go @@ -1,21 +1,34 @@ package pubsub -import "sync" +import ( + "sync" +) +// msgIDGenerator handles computing IDs for msgs +// It allows setting custom generators(MsgIdFunction) per topic type msgIDGenerator struct { - defGen MsgIdFunction + Default MsgIdFunction topicGens map[string]MsgIdFunction topicGensLk sync.RWMutex } -func (m *msgIDGenerator) Add(topic string, gen MsgIdFunction) { +func newMsgIdGenerator() *msgIDGenerator{ + return &msgIDGenerator{ + Default: DefaultMsgIdFn, + topicGens: make(map[string]MsgIdFunction), + } +} + +// Set sets custom id generator(MsgIdFunction) for topic. +func (m *msgIDGenerator) Set(topic string, gen MsgIdFunction) { m.topicGensLk.Lock() m.topicGens[topic] = gen m.topicGensLk.Unlock() } -func (m *msgIDGenerator) GenID(msg *Message) string { +// ID computes ID for the msg or short-circuits with the cached value. +func (m *msgIDGenerator) ID(msg *Message) string { if msg.ID != "" { return msg.ID } @@ -24,7 +37,7 @@ func (m *msgIDGenerator) GenID(msg *Message) string { gen, ok := m.topicGens[msg.GetTopic()] m.topicGensLk.RUnlock() if !ok { - gen = m.defGen + gen = m.Default } msg.ID = gen(msg.Message) diff --git a/peer_gater.go b/peer_gater.go index e334324c..3da2755f 100644 --- a/peer_gater.go +++ b/peer_gater.go @@ -182,7 +182,7 @@ func WithPeerGater(params *PeerGaterParams) Option { ps.tracer = &pubsubTracer{ raw: []RawTracer{gs.gate}, pid: ps.host.ID(), - msgID: ps.msgID, + idGen: ps.idGen, } } diff --git a/pubsub.go b/pubsub.go index cba16c59..7caf5ada 100644 --- a/pubsub.go +++ b/pubsub.go @@ -20,7 +20,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" logging "github.com/ipfs/go-log" - timecache "github.com/whyrusleeping/timecache" + "github.com/whyrusleeping/timecache" ) // DefaultMaximumMessageSize is 1mb. @@ -147,8 +147,8 @@ type PubSub struct { seenMessagesMx sync.Mutex seenMessages *timecache.TimeCache - // function used to compute the ID for a message - msgID MsgIdFunction + // generator used to compute the ID for a message + idGen *msgIDGenerator // key for signing messages; nil when signing is disabled signKey crypto.PrivKey @@ -273,7 +273,7 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option blacklist: NewMapBlacklist(), blacklistPeer: make(chan peer.ID), seenMessages: timecache.NewTimeCache(TimeCacheDuration), - msgID: DefaultMsgIdFn, + idGen: newMsgIdGenerator(), counter: uint64(time.Now().UnixNano()), } @@ -327,11 +327,7 @@ type MsgIdFunction func(pmsg *pb.Message) string // but it can be customized to e.g. the hash of the message. func WithMessageIdFn(fn MsgIdFunction) Option { return func(p *PubSub) error { - p.msgID = fn - // the tracer Option may already be set. Update its message ID function to make options order-independent. - if p.tracer != nil { - p.tracer.msgID = fn - } + p.idGen.Default = fn return nil } } @@ -456,7 +452,7 @@ func WithEventTracer(tracer EventTracer) Option { if p.tracer != nil { p.tracer.tracer = tracer } else { - p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), msgID: p.msgID} + p.tracer = &pubsubTracer{tracer: tracer, pid: p.host.ID(), idGen: p.idGen} } return nil } @@ -469,7 +465,7 @@ func WithRawTracer(tracer RawTracer) Option { if p.tracer != nil { p.tracer.raw = append(p.tracer.raw, tracer) } else { - p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), msgID: p.msgID} + p.tracer = &pubsubTracer{raw: []RawTracer{tracer}, pid: p.host.ID(), idGen: p.idGen} } return nil } @@ -1097,7 +1093,7 @@ func (p *PubSub) pushMsg(msg *Message) { } // have we already seen and validated this message? - id := p.msgID(msg.Message) + id := p.idGen.ID(msg) if p.seenMessage(id) { p.tracer.DuplicateMessage(msg) return diff --git a/score.go b/score.go index 1ee61418..87753a18 100644 --- a/score.go +++ b/score.go @@ -76,7 +76,7 @@ type peerScore struct { // message delivery tracking deliveries *messageDeliveries - msgID MsgIdFunction + idGen *msgIDGenerator host host.Host // debugging inspection @@ -183,7 +183,7 @@ func newPeerScore(params *PeerScoreParams) *peerScore { peerStats: make(map[peer.ID]*peerStats), peerIPs: make(map[string]map[peer.ID]struct{}), deliveries: &messageDeliveries{records: make(map[string]*deliveryRecord)}, - msgID: DefaultMsgIdFn, + idGen: newMsgIdGenerator(), } } @@ -239,7 +239,7 @@ func (ps *peerScore) Start(gs *GossipSubRouter) { return } - ps.msgID = gs.p.msgID + ps.idGen = gs.p.idGen ps.host = gs.p.host go ps.background(gs.p.ctx) } @@ -689,7 +689,7 @@ func (ps *peerScore) ValidateMessage(msg *Message) { // the pubsub subsystem is beginning validation; create a record to track time in // the validation pipeline with an accurate firstSeen time. - _ = ps.deliveries.getRecord(ps.msgID(msg.Message)) + _ = ps.deliveries.getRecord(ps.idGen.ID(msg)) } func (ps *peerScore) DeliverMessage(msg *Message) { @@ -698,7 +698,7 @@ func (ps *peerScore) DeliverMessage(msg *Message) { ps.markFirstMessageDelivery(msg.ReceivedFrom, msg) - drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + drec := ps.deliveries.getRecord(ps.idGen.ID(msg)) // defensive check that this is the first delivery trace -- delivery status should be unknown if drec.status != deliveryUnknown { @@ -749,7 +749,7 @@ func (ps *peerScore) RejectMessage(msg *Message, reason string) { return } - drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + drec := ps.deliveries.getRecord(ps.idGen.ID(msg)) // defensive check that this is the first rejection trace -- delivery status should be unknown if drec.status != deliveryUnknown { @@ -789,7 +789,7 @@ func (ps *peerScore) DuplicateMessage(msg *Message) { ps.Lock() defer ps.Unlock() - drec := ps.deliveries.getRecord(ps.msgID(msg.Message)) + drec := ps.deliveries.getRecord(ps.idGen.ID(msg)) _, ok := drec.peers[msg.ReceivedFrom] if ok { diff --git a/tag_tracer.go b/tag_tracer.go index 65e99be9..ae7318d8 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -44,9 +44,9 @@ var ( type tagTracer struct { sync.RWMutex - cmgr connmgr.ConnManager - msgID MsgIdFunction - decayer connmgr.Decayer + cmgr connmgr.ConnManager + idGen *msgIDGenerator + decayer connmgr.Decayer decaying map[string]connmgr.DecayingTag direct map[peer.ID]struct{} @@ -62,7 +62,7 @@ func newTagTracer(cmgr connmgr.ConnManager) *tagTracer { } return &tagTracer{ cmgr: cmgr, - msgID: DefaultMsgIdFn, + idGen: newMsgIdGenerator(), decayer: decayer, decaying: make(map[string]connmgr.DecayingTag), nearFirst: make(map[string]map[peer.ID]struct{}), @@ -74,7 +74,7 @@ func (t *tagTracer) Start(gs *GossipSubRouter) { return } - t.msgID = gs.p.msgID + t.idGen = gs.p.idGen t.direct = gs.direct } @@ -162,7 +162,7 @@ func (t *tagTracer) bumpTagsForMessage(p peer.ID, msg *Message) { func (t *tagTracer) nearFirstPeers(msg *Message) []peer.ID { t.Lock() defer t.Unlock() - peersMap, ok := t.nearFirst[t.msgID(msg.Message)] + peersMap, ok := t.nearFirst[t.idGen.ID(msg)] if !ok { return nil } @@ -194,7 +194,7 @@ func (t *tagTracer) DeliverMessage(msg *Message) { // delete the delivery state for this message t.Lock() - delete(t.nearFirst, t.msgID(msg.Message)) + delete(t.nearFirst, t.idGen.ID(msg)) t.Unlock() } @@ -215,7 +215,7 @@ func (t *tagTracer) ValidateMessage(msg *Message) { defer t.Unlock() // create map to start tracking the peers who deliver while we're validating - id := t.msgID(msg.Message) + id := t.idGen.ID(msg) if _, exists := t.nearFirst[id]; exists { return } @@ -226,7 +226,7 @@ func (t *tagTracer) DuplicateMessage(msg *Message) { t.Lock() defer t.Unlock() - id := t.msgID(msg.Message) + id := t.idGen.ID(msg) peers, ok := t.nearFirst[id] if !ok { return @@ -247,7 +247,7 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) { case RejectValidationIgnored: fallthrough case RejectValidationFailed: - delete(t.nearFirst, t.msgID(msg.Message)) + delete(t.nearFirst, t.idGen.ID(msg)) } } diff --git a/trace.go b/trace.go index 32325422..7efd6653 100644 --- a/trace.go +++ b/trace.go @@ -64,7 +64,7 @@ type pubsubTracer struct { tracer EventTracer raw []RawTracer pid peer.ID - msgID MsgIdFunction + idGen *msgIDGenerator } func (t *pubsubTracer) PublishMessage(msg *Message) { @@ -82,7 +82,7 @@ func (t *pubsubTracer) PublishMessage(msg *Message) { PeerID: []byte(t.pid), Timestamp: &now, PublishMessage: &pb.TraceEvent_PublishMessage{ - MessageID: []byte(t.msgID(msg.Message)), + MessageID: []byte(t.idGen.ID(msg)), Topic: msg.Message.Topic, }, } @@ -123,7 +123,7 @@ func (t *pubsubTracer) RejectMessage(msg *Message, reason string) { PeerID: []byte(t.pid), Timestamp: &now, RejectMessage: &pb.TraceEvent_RejectMessage{ - MessageID: []byte(t.msgID(msg.Message)), + MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), Reason: &reason, Topic: msg.Topic, @@ -154,7 +154,7 @@ func (t *pubsubTracer) DuplicateMessage(msg *Message) { PeerID: []byte(t.pid), Timestamp: &now, DuplicateMessage: &pb.TraceEvent_DuplicateMessage{ - MessageID: []byte(t.msgID(msg.Message)), + MessageID: []byte(t.idGen.ID(msg)), ReceivedFrom: []byte(msg.ReceivedFrom), Topic: msg.Topic, }, @@ -184,7 +184,7 @@ func (t *pubsubTracer) DeliverMessage(msg *Message) { PeerID: []byte(t.pid), Timestamp: &now, DeliverMessage: &pb.TraceEvent_DeliverMessage{ - MessageID: []byte(t.msgID(msg.Message)), + MessageID: []byte(t.idGen.ID(msg)), Topic: msg.Topic, ReceivedFrom: []byte(msg.ReceivedFrom), }, @@ -344,7 +344,7 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { var msgs []*pb.TraceEvent_MessageMeta for _, m := range rpc.Publish { msgs = append(msgs, &pb.TraceEvent_MessageMeta{ - MessageID: []byte(t.msgID(m)), + MessageID: []byte(t.idGen.ID(&Message{Message: m})), Topic: m.Topic, }) } diff --git a/validation.go b/validation.go index 35d291a1..9fa28d26 100644 --- a/validation.go +++ b/validation.go @@ -284,7 +284,7 @@ func (v *validation) validate(vals []*topicVal, src peer.ID, msg *Message, synch // we can mark the message as seen now that we have verified the signature // and avoid invoking user validators more than once - id := v.p.msgID(msg.Message) + id := v.p.idGen.ID(msg) if !v.p.markSeen(id) { v.tracer.DuplicateMessage(msg) return nil From 9944f826a173a40929b42350b716ef070ad04eb2 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 9 Jan 2022 18:23:33 +0200 Subject: [PATCH 03/10] feat: new WithMsgIdFunction topic option to enable topics to have own msg id generation rules --- pubsub.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pubsub.go b/pubsub.go index 7caf5ada..e817be44 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1160,6 +1160,14 @@ type TopicOptions struct{} type TopicOpt func(t *Topic) error +// WithMsgIdFunction sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules. +func (p *PubSub) WithMsgIdFunction(msgId MsgIdFunction) TopicOpt { + return func(t *Topic) error { + t.p.idGen.Set(t.topic, msgId) + return nil + } +} + // Join joins the topic and returns a Topic handle. Only one Topic handle should exist per topic, and Join will error if // the Topic handle already exists. func (p *PubSub) Join(topic string, opts ...TopicOpt) (*Topic, error) { From 5775354f8b7c3243fbfc40df17584bfe481d0980 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 9 Jan 2022 18:41:10 +0200 Subject: [PATCH 04/10] chore: go fmt and return timecache named import --- midgen.go | 4 ++-- pubsub.go | 4 ++-- tag_tracer.go | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/midgen.go b/midgen.go index d09c87d6..4d4aec71 100644 --- a/midgen.go +++ b/midgen.go @@ -9,11 +9,11 @@ import ( type msgIDGenerator struct { Default MsgIdFunction - topicGens map[string]MsgIdFunction + topicGens map[string]MsgIdFunction topicGensLk sync.RWMutex } -func newMsgIdGenerator() *msgIDGenerator{ +func newMsgIdGenerator() *msgIDGenerator { return &msgIDGenerator{ Default: DefaultMsgIdFn, topicGens: make(map[string]MsgIdFunction), diff --git a/pubsub.go b/pubsub.go index e817be44..0a72f134 100644 --- a/pubsub.go +++ b/pubsub.go @@ -20,7 +20,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" logging "github.com/ipfs/go-log" - "github.com/whyrusleeping/timecache" + timecache "github.com/whyrusleeping/timecache" ) // DefaultMaximumMessageSize is 1mb. @@ -213,7 +213,7 @@ const ( type Message struct { *pb.Message - ID string + ID string ReceivedFrom peer.ID ValidatorData interface{} } diff --git a/tag_tracer.go b/tag_tracer.go index ae7318d8..333edf84 100644 --- a/tag_tracer.go +++ b/tag_tracer.go @@ -44,9 +44,9 @@ var ( type tagTracer struct { sync.RWMutex - cmgr connmgr.ConnManager - idGen *msgIDGenerator - decayer connmgr.Decayer + cmgr connmgr.ConnManager + idGen *msgIDGenerator + decayer connmgr.Decayer decaying map[string]connmgr.DecayingTag direct map[peer.ID]struct{} From c71b1b4a5ecac2026e0890809452c17ce47465ed Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 22 Jan 2022 13:27:50 +0200 Subject: [PATCH 05/10] chore: hello mister mutex hat --- midgen.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/midgen.go b/midgen.go index 4d4aec71..972f1b02 100644 --- a/midgen.go +++ b/midgen.go @@ -9,8 +9,8 @@ import ( type msgIDGenerator struct { Default MsgIdFunction - topicGens map[string]MsgIdFunction topicGensLk sync.RWMutex + topicGens map[string]MsgIdFunction } func newMsgIdGenerator() *msgIDGenerator { From 775cda46f17f2d143c8ce29dff8422aae3d2398d Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 22 Jan 2022 13:34:19 +0200 Subject: [PATCH 06/10] feat: extract RawID from ID --- midgen.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/midgen.go b/midgen.go index 972f1b02..9d3acfc1 100644 --- a/midgen.go +++ b/midgen.go @@ -2,6 +2,8 @@ package pubsub import ( "sync" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) // msgIDGenerator handles computing IDs for msgs @@ -33,6 +35,12 @@ func (m *msgIDGenerator) ID(msg *Message) string { return msg.ID } + msg.ID = m.RawID(msg.Message) + return msg.ID +} + +// RawID computes ID for the proto 'msg'. +func (m *msgIDGenerator) RawID(msg *pb.Message) string { m.topicGensLk.RLock() gen, ok := m.topicGens[msg.GetTopic()] m.topicGensLk.RUnlock() @@ -40,6 +48,5 @@ func (m *msgIDGenerator) ID(msg *Message) string { gen = m.Default } - msg.ID = gen(msg.Message) - return msg.ID + return gen(msg) } From fbb9a6da7f09304eea0c592c4d8c04831facd9f2 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 22 Jan 2022 13:42:02 +0200 Subject: [PATCH 07/10] fix: use RawID in traceRPCMeta to avoid allocations --- trace.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/trace.go b/trace.go index 7efd6653..1426e318 100644 --- a/trace.go +++ b/trace.go @@ -344,7 +344,7 @@ func (t *pubsubTracer) traceRPCMeta(rpc *RPC) *pb.TraceEvent_RPCMeta { var msgs []*pb.TraceEvent_MessageMeta for _, m := range rpc.Publish { msgs = append(msgs, &pb.TraceEvent_MessageMeta{ - MessageID: []byte(t.idGen.ID(&Message{Message: m})), + MessageID: []byte(t.idGen.RawID(m)), Topic: m.Topic, }) } From e3997c32f892217fefa0c977e8029dff0e5f5cdd Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 22 Jan 2022 14:00:57 +0200 Subject: [PATCH 08/10] feat: detach WithMsgIdFunction --- pubsub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub.go b/pubsub.go index 0a72f134..e5f8c04e 100644 --- a/pubsub.go +++ b/pubsub.go @@ -20,7 +20,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" logging "github.com/ipfs/go-log" - timecache "github.com/whyrusleeping/timecache" + "github.com/whyrusleeping/timecache" ) // DefaultMaximumMessageSize is 1mb. @@ -1161,7 +1161,7 @@ type TopicOptions struct{} type TopicOpt func(t *Topic) error // WithMsgIdFunction sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules. -func (p *PubSub) WithMsgIdFunction(msgId MsgIdFunction) TopicOpt { +func WithMsgIdFunction(msgId MsgIdFunction) TopicOpt { return func(t *Topic) error { t.p.idGen.Set(t.topic, msgId) return nil From ed38981f254b993554d7f0c657b375a9f0e3c33e Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sat, 22 Jan 2022 23:48:23 +0200 Subject: [PATCH 09/10] chore: better name --- pubsub.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pubsub.go b/pubsub.go index e5f8c04e..901314b9 100644 --- a/pubsub.go +++ b/pubsub.go @@ -1160,8 +1160,8 @@ type TopicOptions struct{} type TopicOpt func(t *Topic) error -// WithMsgIdFunction sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules. -func WithMsgIdFunction(msgId MsgIdFunction) TopicOpt { +// WithTopicMessageIdFn sets custom MsgIdFunction for a Topic, enabling topics to have own msg id generation rules. +func WithTopicMessageIdFn(msgId MsgIdFunction) TopicOpt { return func(t *Topic) error { t.p.idGen.Set(t.topic, msgId) return nil From 8f42357e1b2a7cd90e0e3a5cae19c759546b1f0f Mon Sep 17 00:00:00 2001 From: Wondertan Date: Sun, 23 Jan 2022 00:06:47 +0200 Subject: [PATCH 10/10] tests: new test for WithTopicMsgIdFunction --- topic_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/topic_test.go b/topic_test.go index 52927c0a..e236c602 100644 --- a/topic_test.go +++ b/topic_test.go @@ -3,6 +3,8 @@ package pubsub import ( "bytes" "context" + "crypto/sha1" + "crypto/sha256" "errors" "fmt" "math/rand" @@ -11,6 +13,7 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic { @@ -860,3 +863,60 @@ func TestMinTopicSizeNoDiscovery(t *testing.T) { t.Fatal("received incorrect message") } } + +func TestWithTopicMsgIdFunction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topicA, topicB = "foobarA", "foobarB" + const numHosts = 2 + + hosts := getNetHosts(t, ctx, numHosts) + pubsubs := getPubsubs(ctx, hosts, WithMessageIdFn(func(pmsg *pb.Message) string { + hash := sha256.Sum256(pmsg.Data) + return string(hash[:]) + })) + connectAll(t, hosts) + + topicsA := getTopics(pubsubs, topicA) // uses global msgIdFn + topicsB := getTopics(pubsubs, topicB, WithTopicMessageIdFn(func(pmsg *pb.Message) string { // uses custom + hash := sha1.Sum(pmsg.Data) + return string(hash[:]) + })) + + payload := []byte("pubsub rocks") + + subA, err := topicsA[0].Subscribe() + if err != nil { + t.Fatal(err) + } + + err = topicsA[1].Publish(ctx, payload, WithReadiness(MinTopicSize(1))) + if err != nil { + t.Fatal(err) + } + + msgA, err := subA.Next(ctx) + if err != nil { + t.Fatal(err) + } + + subB, err := topicsB[0].Subscribe() + if err != nil { + t.Fatal(err) + } + + err = topicsB[1].Publish(ctx, payload, WithReadiness(MinTopicSize(1))) + if err != nil { + t.Fatal(err) + } + + msgB, err := subB.Next(ctx) + if err != nil { + t.Fatal(err) + } + + if msgA.ID == msgB.ID { + t.Fatal("msg ids are equal") + } +}