Skip to content

Commit

Permalink
zmq4: resend subscriptions in socket.addConn
Browse files Browse the repository at this point in the history
  • Loading branch information
thielepaul committed Jun 14, 2022
1 parent 16d169c commit 630578d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 6 deletions.
19 changes: 13 additions & 6 deletions socket.go
Expand Up @@ -30,12 +30,13 @@ var (

// socket implements the ZeroMQ socket interface
type socket struct {
ep string // socket end-point
typ SocketType
id SocketIdentity
retry time.Duration
sec Security
log *log.Logger
ep string // socket end-point
typ SocketType
id SocketIdentity
retry time.Duration
sec Security
log *log.Logger
subTopics func() []string

mu sync.RWMutex
ids map[string]*Conn // ZMTP connection IDs
Expand Down Expand Up @@ -286,6 +287,12 @@ func (sck *socket) addConn(c *Conn) {
if sck.r != nil {
sck.r.addConn(c)
}
// resend subscriptions for topics if there are any
if sck.subTopics != nil {
for _, topic := range sck.subTopics() {
_ = sck.Send(NewMsg(append([]byte{1}, topic...)))
}
}
sck.mu.Unlock()
}

Expand Down
35 changes: 35 additions & 0 deletions socket_test.go
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"io"
"net"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -220,3 +221,37 @@ func TestConnReaperDeadlock(t *testing.T) {
clients[i].Close()
}
}

func TestConnMaxRetries(t *testing.T) {
endpoint := "inproc://test-resub"

sub := zmq4.NewSub(context.Background())
defer sub.Close()
pub := zmq4.NewPub(context.Background())
defer pub.Close()
sub.SetOption(zmq4.OptionSubscribe, "test")
if err := sub.Listen(endpoint); err != nil {
t.Errorf("Sub Dial failed: %v", err)
}
if err := pub.Dial(endpoint); err != nil {
t.Errorf("Pub Dial failed: %v", err)
}
wg := new(sync.WaitGroup)
defer wg.Wait()
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(1 * time.Millisecond)
err := pub.Send(zmq4.NewMsgFromString([]string{"test"}))
if err != nil {
return
}
}()
msg, err := sub.Recv()
if err != nil {
t.Errorf("Recv failed: %v", err)
}
if string(msg.Frames[0]) != "test" {
t.Errorf("Expected 'test' got '%s'", msg.String())
}
}
1 change: 1 addition & 0 deletions sub.go
Expand Up @@ -16,6 +16,7 @@ import (
func NewSub(ctx context.Context, opts ...Option) Socket {
sub := &subSocket{sck: newSocket(ctx, Sub, opts...)}
sub.sck.r = newQReader(sub.sck.ctx)
sub.sck.subTopics = sub.Topics
sub.topics = make(map[string]struct{})
return sub
}
Expand Down

0 comments on commit 630578d

Please sign in to comment.