Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trial for js.Request #1107

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 82 additions & 0 deletions js.go
Expand Up @@ -52,6 +52,8 @@ type JetStream interface {
// PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd.
PublishAsyncComplete() <-chan struct{}

Request(subj string, data []byte, timeout time.Duration, opts ...PubOpt) (*Msg, error)

// Subscribe creates an async Subscription for JetStream.
// The stream and consumer names can be provided with the nats.Bind() option.
// For creating an ephemeral (where the consumer name is picked by the server),
Expand Down Expand Up @@ -462,6 +464,7 @@ const (
ExpectedLastSubjSeqHdr = "Nats-Expected-Last-Subject-Sequence"
ExpectedLastMsgIdHdr = "Nats-Expected-Last-Msg-Id"
MsgRollup = "Nats-Rollup"
MsgReplyHdr = "Nats-Reply"
)

// Headers for republished messages and direct gets.
Expand Down Expand Up @@ -579,6 +582,85 @@ func (js *js) Publish(subj string, data []byte, opts ...PubOpt) (*PubAck, error)
return js.PublishMsg(&Msg{Subject: subj, Data: data}, opts...)
}

// Helper to setup and send new request style requests. Return the chan to receive the response.
// TODO: could be refactored out of nc.createNewRequestAndSend to remove duplicate code
func (js *js) createNewReplySub() (string, chan *Msg, string, error) {
js.nc.mu.Lock()
// Do setup for the new style if needed.
if js.nc.respMap == nil {
js.nc.initNewResp()
}
// Create new literal Inbox and map to a chan msg.
mch := make(chan *Msg, RequestChanLen)
respInbox := js.nc.newRespInbox()
token := respInbox[js.nc.respSubLen:]

js.nc.respMap[token] = mch
if js.nc.respMux == nil {
// Create the response subscription we will use for all new style responses.
// This will be on an _INBOX with an additional terminal token. The subscription
// will be on a wildcard.
s, err := js.nc.subscribeLocked(js.nc.respSub, _EMPTY_, js.nc.respHandler, nil, false, nil)
if err != nil {
js.nc.mu.Unlock()
return "", nil, token, err
}
js.nc.respScanf = strings.Replace(js.nc.respSub, "*", "%s", -1)
js.nc.respMux = s
}
js.nc.mu.Unlock()

return respInbox, mch, token, nil
}

// Request publishes a message to a stream and waits for a subscriber to reply.
func (js *js) Request(subj string, data []byte, timeout time.Duration, opts ...PubOpt) (*Msg, error) {
// Allocate a new inbox subject and channel for receiving a message
// from the multiplexed inbox subscriber.
inbox, mch, token, err := js.createNewReplySub()
if err != nil {
return nil, err
}

// Build a message, setting the reply header.
msg := NewMsg(subj)
msg.Data = data
msg.Header.Add(MsgReplyHdr, inbox)

// Publish to the stream. If this fails, delete the reply channel.
puback, err := js.PublishMsg(msg)
if err != nil {
js.nc.mu.Lock()
delete(js.nc.respMap, token)
js.nc.mu.Unlock()
return nil, err
}

// Get a timer and wait for the message. If there is a timeout or other
// error, the message will be deleted from the stream.
// TODO: make auto-delete configurable?
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)

var ok bool

select {
case msg, ok = <-mch:
if !ok {
js.DeleteMsg(puback.Stream, puback.Sequence, Domain(puback.Domain))
return nil, ErrConnectionClosed
}
case <-t.C:
js.nc.mu.Lock()
delete(js.nc.respMap, token)
js.nc.mu.Unlock()
js.DeleteMsg(puback.Stream, puback.Sequence, Domain(puback.Domain))
return nil, ErrTimeout
}

return msg, nil
}

// PubAckFuture is a future for a PubAck.
type PubAckFuture interface {
// Ok returns a receive only channel that can be used to get a PubAck.
Expand Down
86 changes: 83 additions & 3 deletions js_test.go
Expand Up @@ -37,7 +37,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
)

func client(t *testing.T, s *server.Server, opts ...Option) *Conn {
func client(t testing.TB, s *server.Server, opts ...Option) *Conn {
t.Helper()
nc, err := Connect(s.ClientURL(), opts...)
if err != nil {
Expand All @@ -46,7 +46,7 @@ func client(t *testing.T, s *server.Server, opts ...Option) *Conn {
return nc
}

func jsClient(t *testing.T, s *server.Server, opts ...Option) (*Conn, JetStreamContext) {
func jsClient(t testing.TB, s *server.Server, opts ...Option) (*Conn, JetStreamContext) {
t.Helper()
nc := client(t, s, opts...)
js, err := nc.JetStream(MaxWait(10 * time.Second))
Expand Down Expand Up @@ -82,7 +82,7 @@ func createConfFile(t *testing.T, content []byte) string {
return fName
}

func shutdownJSServerAndRemoveStorage(t *testing.T, s *server.Server) {
func shutdownJSServerAndRemoveStorage(t testing.TB, s *server.Server) {
t.Helper()
var sd string
if config := s.JetStreamConfig(); config != nil {
Expand Down Expand Up @@ -1230,3 +1230,83 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}

func TestJetStreamRequest(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
Retention: WorkQueuePolicy,
Storage: MemoryStorage,
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

msgch := make(chan *Msg, 1)
errch := make(chan error, 1)

go func() {
// Send a request via the stream.
rep, err := js.Request("test.foo", nil, time.Second)
if err != nil {
errch <- err
} else {
msgch <- rep
}
}()

// Delay to late bind a susbcriber...
time.Sleep(200 * time.Millisecond)

// Subscribe to the subject.
_, err = js.Subscribe("test.foo", func(msg *Msg) {
msg.Respond([]byte("hello"))
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

select {
case rep := <-msgch:
t.Logf("msg data: %q", string(rep.Data))
case err := <-errch:
t.Fatalf("Unexpected error: %s", err)
}
}

func BenchmarkJetStreamRequest(b *testing.B) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(b, s)

nc, js := jsClient(b, s)
defer nc.Close()

_, err := js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
Retention: WorkQueuePolicy,
Storage: MemoryStorage,
})
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

// Subscribe to the subject.
_, err = js.Subscribe("test.foo", func(msg *Msg) {
msg.Respond([]byte("hello"))
})
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

// Send a request via the stream.
for i := 0; i < b.N; i++ {
js.Request("test.foo", nil, time.Second)
}
}
14 changes: 12 additions & 2 deletions nats.go
Expand Up @@ -4626,14 +4626,24 @@ func (m *Msg) Respond(data []byte) error {
if m == nil || m.Sub == nil {
return ErrMsgNotBound
}
if m.Reply == "" {

reply := m.Reply

// Check if explicit JS-based header is present.
// TODO: should this trigger a msg.Ack automatically?
pubReply := m.Header.Get(MsgReplyHdr)
if pubReply != "" {
reply = pubReply
}

if reply == "" {
return ErrMsgNoReply
}
m.Sub.mu.Lock()
nc := m.Sub.conn
m.Sub.mu.Unlock()
// No need to check the connection here since the call to publish will do all the checking.
return nc.Publish(m.Reply, data)
return nc.Publish(reply, data)
}

// RespondMsg allows a convenient way to respond to requests in service based subscriptions that might include headers
Expand Down