Skip to content

Commit

Permalink
Merge pull request #3158 from nats-io/kv-direct-get
Browse files Browse the repository at this point in the history
[IMPROVED] Fast and Direct access to stream messages.
  • Loading branch information
derekcollison committed Jun 5, 2022
2 parents d836076 + 0979bce commit fddc31a
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 2 deletions.
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ const (
JSApiMsgGet = "$JS.API.STREAM.MSG.GET.*"
JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"

// JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
// Will return the message similar to how a consumer receives the message, no JSON processing.
// If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
JSDirectMsgGet = "$JS.DS.GET.*"
JSDirectMsgGetT = "$JS.DS.GET.%s"

// JSApiConsumerCreate is the endpoint to create ephemeral consumers for streams.
// Will return JSON response.
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
Expand Down
144 changes: 143 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17746,6 +17746,99 @@ func TestJetStreamMsgGetNoAdvisory(t *testing.T) {
checkSubsPending(t, sub, 0)
}

func TestJetStreamDirectMsgGet(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

// Do by hand for now.
cfg := &StreamConfig{
Name: "DSMG",
Storage: MemoryStorage,
Subjects: []string{"foo", "bar", "baz"},
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)

sendStreamMsg(t, nc, "foo", "foo")
sendStreamMsg(t, nc, "bar", "bar")
sendStreamMsg(t, nc, "baz", "baz")

getSubj := fmt.Sprintf(JSDirectMsgGetT, "DSMG")
getMsg := func(req *JSApiMsgGetRequest) *nats.Msg {
var b []byte
var err error
if req != nil {
b, err = json.Marshal(req)
require_NoError(t, err)
}
m, err := nc.Request(getSubj, b, time.Second)
require_NoError(t, err)
return m
}

m := getMsg(&JSApiMsgGetRequest{LastFor: "foo"})
require_True(t, string(m.Data) == "foo")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "1")
require_True(t, m.Header.Get(JSSubject) == "foo")
require_True(t, m.Subject != "foo")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

m = getMsg(&JSApiMsgGetRequest{LastFor: "bar"})
require_True(t, string(m.Data) == "bar")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "2")
require_True(t, m.Header.Get(JSSubject) == "bar")
require_True(t, m.Subject != "bar")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

m = getMsg(&JSApiMsgGetRequest{LastFor: "baz"})
require_True(t, string(m.Data) == "baz")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "3")
require_True(t, m.Header.Get(JSSubject) == "baz")
require_True(t, m.Subject != "baz")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

// Test error conditions

// Nil request
m = getMsg(nil)
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Empty Request")

// Empty request
m = getMsg(&JSApiMsgGetRequest{})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Empty Request")

// Both set
m = getMsg(&JSApiMsgGetRequest{Seq: 1, LastFor: "foo"})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Bad Request")

// Not found
m = getMsg(&JSApiMsgGetRequest{LastFor: "foobar"})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "404")
require_True(t, m.Header.Get("Description") == "Message Not Found")

m = getMsg(&JSApiMsgGetRequest{Seq: 22})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "404")
require_True(t, m.Header.Get("Description") == "Message Not Found")
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -18047,10 +18140,59 @@ func TestJetStreamKVMemoryStorePerf(t *testing.T) {

start = time.Now()
for i := 0; i < 100_000; i++ {
_, err := kv.PutString(fmt.Sprintf("foo.%d", i), "HELLO")
_, err := kv.PutString(fmt.Sprintf("foo.%d", i), "HELLO WORLD")
require_NoError(t, err)
}
fmt.Printf("Took %v for second run\n", time.Since(start))

start = time.Now()
for i := 0; i < 100_000; i++ {
_, err := kv.Get(fmt.Sprintf("foo.%d", i))
require_NoError(t, err)
}
fmt.Printf("Took %v for get\n", time.Since(start))
}

func TestJetStreamKVMemoryStoreDirectGetPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"foo.*"},
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)

start := time.Now()
for i := 0; i < 100_000; i++ {
_, err := js.Publish(fmt.Sprintf("foo.%d", i), []byte("HELLO"))
require_NoError(t, err)
}
fmt.Printf("Took %v for put\n", time.Since(start))

getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")

const tmpl = "{\"last_by_subj\":%q}"

start = time.Now()
for i := 0; i < 100_000; i++ {
req := []byte(fmt.Sprintf(tmpl, fmt.Sprintf("foo.%d", i)))
_, err := nc.Request(getSubj, req, time.Second)
require_NoError(t, err)
}
fmt.Printf("Took %v for get\n", time.Since(start))
}

func TestJetStreamMultiplePullPerf(t *testing.T) {
Expand Down
80 changes: 79 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type StreamConfig struct {
// AllowRollup allows messages to be placed into the system and purge
// all older messages using a special msg header.
AllowRollup bool `json:"allow_rollup_hdrs"`
// Allow higher peformance, direct access to get individual messages.
AllowDirect bool `json:"allow_direct,omitempty"`
}

// RePublish is for republishing messages once committed to a stream.
Expand Down Expand Up @@ -265,10 +267,12 @@ const (
JSResponseType = "Nats-Response-Type"
)

// Headers for republished messages.
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)

Expand Down Expand Up @@ -2844,6 +2848,12 @@ func (mset *stream) subscribeToStream() error {
return err
}
}
if mset.cfg.AllowDirect {
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
if _, err := mset.subscribeInternal(dsubj, mset.processDirectGetRequest); err != nil {
return err
}
}

mset.active = true
return nil
Expand Down Expand Up @@ -2880,6 +2890,9 @@ func (mset *stream) unsubscribeToStream() error {
mset.stopSourceConsumers()
}

// In case we had a direct get subscription.
mset.unsubscribeInternal(fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name))

mset.active = false
return nil
}
Expand Down Expand Up @@ -3163,6 +3176,71 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}

// processDirectGetRequest handles direct get request for stream messages.
func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
if len(msg) == 0 {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
var req JSApiMsgGetRequest
err := json.Unmarshal(msg, &req)
if err != nil {
hdr := []byte("NATS/1.0 408 Malformed Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// Check if nothing set.
if req.Seq == 0 && req.LastFor == _EMPTY_ {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// Check that we do not have both options set.
if req.Seq > 0 && req.LastFor != _EMPTY_ {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

var svp StoreMsg
var sm *StoreMsg

mset.mu.RLock()
store, name := mset.store, mset.cfg.Name
mset.mu.RUnlock()

if req.Seq > 0 {
sm, err = store.LoadMsg(req.Seq, &svp)
} else {
sm, err = store.LoadLastMsg(req.LastFor, &svp)
}
if err != nil {
hdr := []byte("NATS/1.0 404 Message Not Found\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

hdr := sm.hdr
ts := time.Unix(0, sm.ts).UTC()

if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %v\r\n\r\n"
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts))
} else {
hdr = copyBytes(hdr)
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
hdr = genHeader(hdr, JSTimeStamp, ts.String())
}
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
}

// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
mset.mu.RLock()
Expand Down

0 comments on commit fddc31a

Please sign in to comment.