diff --git a/topic_test.go b/topic_test.go index 52927c0a..e236c602 100644 --- a/topic_test.go +++ b/topic_test.go @@ -3,6 +3,8 @@ package pubsub import ( "bytes" "context" + "crypto/sha1" + "crypto/sha256" "errors" "fmt" "math/rand" @@ -11,6 +13,7 @@ import ( "time" "github.com/libp2p/go-libp2p-core/peer" + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) func getTopics(psubs []*PubSub, topicID string, opts ...TopicOpt) []*Topic { @@ -860,3 +863,60 @@ func TestMinTopicSizeNoDiscovery(t *testing.T) { t.Fatal("received incorrect message") } } + +func TestWithTopicMsgIdFunction(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + const topicA, topicB = "foobarA", "foobarB" + const numHosts = 2 + + hosts := getNetHosts(t, ctx, numHosts) + pubsubs := getPubsubs(ctx, hosts, WithMessageIdFn(func(pmsg *pb.Message) string { + hash := sha256.Sum256(pmsg.Data) + return string(hash[:]) + })) + connectAll(t, hosts) + + topicsA := getTopics(pubsubs, topicA) // uses global msgIdFn + topicsB := getTopics(pubsubs, topicB, WithTopicMessageIdFn(func(pmsg *pb.Message) string { // uses custom + hash := sha1.Sum(pmsg.Data) + return string(hash[:]) + })) + + payload := []byte("pubsub rocks") + + subA, err := topicsA[0].Subscribe() + if err != nil { + t.Fatal(err) + } + + err = topicsA[1].Publish(ctx, payload, WithReadiness(MinTopicSize(1))) + if err != nil { + t.Fatal(err) + } + + msgA, err := subA.Next(ctx) + if err != nil { + t.Fatal(err) + } + + subB, err := topicsB[0].Subscribe() + if err != nil { + t.Fatal(err) + } + + err = topicsB[1].Publish(ctx, payload, WithReadiness(MinTopicSize(1))) + if err != nil { + t.Fatal(err) + } + + msgB, err := subB.Next(ctx) + if err != nil { + t.Fatal(err) + } + + if msgA.ID == msgB.ID { + t.Fatal("msg ids are equal") + } +}