diff --git a/socket.go b/socket.go index 6ea5719..57831a0 100644 --- a/socket.go +++ b/socket.go @@ -30,12 +30,13 @@ var ( // socket implements the ZeroMQ socket interface type socket struct { - ep string // socket end-point - typ SocketType - id SocketIdentity - retry time.Duration - sec Security - log *log.Logger + ep string // socket end-point + typ SocketType + id SocketIdentity + retry time.Duration + sec Security + log *log.Logger + subTopics func() []string mu sync.RWMutex ids map[string]*Conn // ZMTP connection IDs @@ -286,6 +287,12 @@ func (sck *socket) addConn(c *Conn) { if sck.r != nil { sck.r.addConn(c) } + // resend subscriptions for topics if there are any + if sck.subTopics != nil { + for _, topic := range sck.subTopics() { + _ = sck.Send(NewMsg(append([]byte{1}, topic...))) + } + } sck.mu.Unlock() } diff --git a/socket_test.go b/socket_test.go index a1ea891..da1e9e0 100644 --- a/socket_test.go +++ b/socket_test.go @@ -9,6 +9,7 @@ import ( "fmt" "io" "net" + "sync" "testing" "time" @@ -220,3 +221,37 @@ func TestConnReaperDeadlock(t *testing.T) { clients[i].Close() } } + +func TestConnMaxRetries(t *testing.T) { + endpoint := "inproc://test-resub" + + sub := zmq4.NewSub(context.Background()) + defer sub.Close() + pub := zmq4.NewPub(context.Background()) + defer pub.Close() + sub.SetOption(zmq4.OptionSubscribe, "test") + if err := sub.Listen(endpoint); err != nil { + t.Errorf("Sub Dial failed: %v", err) + } + if err := pub.Dial(endpoint); err != nil { + t.Errorf("Pub Dial failed: %v", err) + } + wg := new(sync.WaitGroup) + defer wg.Wait() + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(1 * time.Millisecond) + err := pub.Send(zmq4.NewMsgFromString([]string{"test"})) + if err != nil { + return + } + }() + msg, err := sub.Recv() + if err != nil { + t.Errorf("Recv failed: %v", err) + } + if string(msg.Frames[0]) != "test" { + t.Errorf("Expected 'test' got '%s'", msg.String()) + } +} diff --git a/sub.go b/sub.go index c530226..008d6b1 100644 --- a/sub.go +++ b/sub.go @@ -16,6 +16,7 @@ import ( func NewSub(ctx context.Context, opts ...Option) Socket { sub := &subSocket{sck: newSocket(ctx, Sub, opts...)} sub.sck.r = newQReader(sub.sck.ctx) + sub.sck.subTopics = sub.Topics sub.topics = make(map[string]struct{}) return sub }