Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMPROVED] Fast and Direct access to stream messages. #3158

Merged
merged 2 commits into from
Jun 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ const (
JSApiMsgGet = "$JS.API.STREAM.MSG.GET.*"
JSApiMsgGetT = "$JS.API.STREAM.MSG.GET.%s"

// JSDirectMsgGet is the template for non-api layer direct requests for a message by its stream sequence number or last by subject.
// Will return the message similar to how a consumer receives the message, no JSON processing.
// If the message can not be found we will use a status header of 404. If the stream does not exist the client will get a no-responders or timeout.
JSDirectMsgGet = "$JS.DS.GET.*"
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
JSDirectMsgGetT = "$JS.DS.GET.%s"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesnt domains allow 2 domains to host the same name stream? But the core subject space is shared by all domains in an account? So just saying stream name could lead to issues right?

Not that up to speed with domains so might just be confused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be thought through a bit more, but even now if those streams have the same subjects they will be placed into both.

I also think we should place the direct get in a DQ group by default such that replicas can share the load.

This is experimental for now as we settle in the API and subjects and mechanics. Will merge.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm the domain scenario where 2 streams have the same name and subject space - a monumentally bad idea - isn’t about replicas it’s about like regional to hub migration/consolidation.

So it’s not about replicas sharing the load. Data might just not be there.

Wirh this I would say direct must not be default behaviour but sometning you opt in when you really grok your setup?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct is opt-in, agree you don't want to cross over with Domains, just saying that issues already is present with ingest subjects.


// JSApiConsumerCreate is the endpoint to create ephemeral consumers for streams.
// Will return JSON response.
JSApiConsumerCreate = "$JS.API.CONSUMER.CREATE.*"
Expand Down
144 changes: 143 additions & 1 deletion server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17746,6 +17746,99 @@ func TestJetStreamMsgGetNoAdvisory(t *testing.T) {
checkSubsPending(t, sub, 0)
}

func TestJetStreamDirectMsgGet(t *testing.T) {
s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

// Do by hand for now.
cfg := &StreamConfig{
Name: "DSMG",
Storage: MemoryStorage,
Subjects: []string{"foo", "bar", "baz"},
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)

sendStreamMsg(t, nc, "foo", "foo")
sendStreamMsg(t, nc, "bar", "bar")
sendStreamMsg(t, nc, "baz", "baz")

getSubj := fmt.Sprintf(JSDirectMsgGetT, "DSMG")
getMsg := func(req *JSApiMsgGetRequest) *nats.Msg {
var b []byte
var err error
if req != nil {
b, err = json.Marshal(req)
require_NoError(t, err)
}
m, err := nc.Request(getSubj, b, time.Second)
require_NoError(t, err)
return m
}

m := getMsg(&JSApiMsgGetRequest{LastFor: "foo"})
require_True(t, string(m.Data) == "foo")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "1")
require_True(t, m.Header.Get(JSSubject) == "foo")
require_True(t, m.Subject != "foo")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

m = getMsg(&JSApiMsgGetRequest{LastFor: "bar"})
require_True(t, string(m.Data) == "bar")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "2")
require_True(t, m.Header.Get(JSSubject) == "bar")
require_True(t, m.Subject != "bar")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

m = getMsg(&JSApiMsgGetRequest{LastFor: "baz"})
require_True(t, string(m.Data) == "baz")
require_True(t, m.Header.Get(JSStream) == "DSMG")
require_True(t, m.Header.Get(JSSequence) == "3")
require_True(t, m.Header.Get(JSSubject) == "baz")
require_True(t, m.Subject != "baz")
require_True(t, m.Header.Get(JSTimeStamp) != _EMPTY_)

// Test error conditions

// Nil request
m = getMsg(nil)
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Empty Request")

// Empty request
m = getMsg(&JSApiMsgGetRequest{})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Empty Request")

// Both set
m = getMsg(&JSApiMsgGetRequest{Seq: 1, LastFor: "foo"})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "408")
require_True(t, m.Header.Get("Description") == "Bad Request")

// Not found
m = getMsg(&JSApiMsgGetRequest{LastFor: "foobar"})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "404")
require_True(t, m.Header.Get("Description") == "Message Not Found")

m = getMsg(&JSApiMsgGetRequest{Seq: 22})
require_True(t, len(m.Data) == 0)
require_True(t, m.Header.Get("Status") == "404")
require_True(t, m.Header.Get("Description") == "Message Not Found")
}

///////////////////////////////////////////////////////////////////////////
// Simple JetStream Benchmarks
///////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -18047,10 +18140,59 @@ func TestJetStreamKVMemoryStorePerf(t *testing.T) {

start = time.Now()
for i := 0; i < 100_000; i++ {
_, err := kv.PutString(fmt.Sprintf("foo.%d", i), "HELLO")
_, err := kv.PutString(fmt.Sprintf("foo.%d", i), "HELLO WORLD")
require_NoError(t, err)
}
fmt.Printf("Took %v for second run\n", time.Since(start))

start = time.Now()
for i := 0; i < 100_000; i++ {
_, err := kv.Get(fmt.Sprintf("foo.%d", i))
require_NoError(t, err)
}
fmt.Printf("Took %v for get\n", time.Since(start))
}

func TestJetStreamKVMemoryStoreDirectGetPerf(t *testing.T) {
// Comment out to run, holding place for now.
t.SkipNow()

s := RunBasicJetStreamServer()
if config := s.JetStreamConfig(); config != nil {
defer removeDir(t, config.StoreDir)
}
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

cfg := &StreamConfig{
Name: "TEST",
Storage: MemoryStorage,
Subjects: []string{"foo.*"},
MaxMsgsPer: 1,
AllowDirect: true,
}
addStream(t, nc, cfg)

start := time.Now()
for i := 0; i < 100_000; i++ {
_, err := js.Publish(fmt.Sprintf("foo.%d", i), []byte("HELLO"))
require_NoError(t, err)
}
fmt.Printf("Took %v for put\n", time.Since(start))

getSubj := fmt.Sprintf(JSDirectMsgGetT, "TEST")

const tmpl = "{\"last_by_subj\":%q}"

start = time.Now()
for i := 0; i < 100_000; i++ {
req := []byte(fmt.Sprintf(tmpl, fmt.Sprintf("foo.%d", i)))
_, err := nc.Request(getSubj, req, time.Second)
require_NoError(t, err)
}
fmt.Printf("Took %v for get\n", time.Since(start))
}

func TestJetStreamMultiplePullPerf(t *testing.T) {
Expand Down
80 changes: 79 additions & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type StreamConfig struct {
// AllowRollup allows messages to be placed into the system and purge
// all older messages using a special msg header.
AllowRollup bool `json:"allow_rollup_hdrs"`
// Allow higher peformance, direct access to get individual messages.
AllowDirect bool `json:"allow_direct,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this option makes it so that KV would have to examine not only the server version but the stream info to know if this feature is enabled. Why does this feature need stream opt-in, but the other doesn't?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would only get true here on streams where its opted in AND the server is the correct version.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct it is actually simpler, but the point is that we have to enable some option to enable the API, but we didn't for the other, so what is so special about this one?

Copy link
Contributor

@ripienaar ripienaar May 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From discussions with Derek - but not yet from reading the whole PR - this sets up a new subject where you can request data bypassing various expensive encodings, audits and things. It's opt in so these things dont listen on all servers etc. Crucially also bypasses the all-servers system import of $JS.API.> - hence "direct"

I have concerns about upgrading to the new model and breaking old clients - KV is now WIDELY used - so this is opt in behavior and behavior appropriately versions client libraries can detect and choose to use if they wish. There are also concerns about breaking existing deployments wrt we are adding new subjects and their ACLs might not cater for them meaning the client either breaks or we now have a new way to exfil data users dont know about. Not to mention cross account/domain issues.

Feature is there to make N million clients work rather than your garden variety day to day, and will have complexities cross accounts/leafnodes etc being on a subject outside of $JS.API.> (all the domain denies etc).

So given all above, its opt-in for the moment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, only new servers will return stream info with this set to true. Clients will only be able to use direct method if this is set to true, otherwise use current behavior through $JS.API route.

}

// RePublish is for republishing messages once committed to a stream.
Expand Down Expand Up @@ -265,10 +267,12 @@ const (
JSResponseType = "Nats-Response-Type"
)

// Headers for republished messages.
// Headers for republished messages and direct gets.
const (
JSStream = "Nats-Stream"
JSSequence = "Nats-Sequence"
JSTimeStamp = "Nats-Time-Stamp"
JSSubject = "Nats-Subject"
JSLastSequence = "Nats-Last-Sequence"
)

Expand Down Expand Up @@ -2844,6 +2848,12 @@ func (mset *stream) subscribeToStream() error {
return err
}
}
if mset.cfg.AllowDirect {
dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)
if _, err := mset.subscribeInternal(dsubj, mset.processDirectGetRequest); err != nil {
return err
}
}

mset.active = true
return nil
Expand Down Expand Up @@ -2880,6 +2890,9 @@ func (mset *stream) unsubscribeToStream() error {
mset.stopSourceConsumers()
}

// In case we had a direct get subscription.
mset.unsubscribeInternal(fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name))

mset.active = false
return nil
}
Expand Down Expand Up @@ -3163,6 +3176,71 @@ func (mset *stream) queueInboundMsg(subj, rply string, hdr, msg []byte) {
mset.queueInbound(mset.msgs, subj, rply, hdr, msg)
}

// processDirectGetRequest handles direct get request for stream messages.
func (mset *stream) processDirectGetRequest(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
_, msg := c.msgParts(rmsg)
if len(reply) == 0 {
return
}
if len(msg) == 0 {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
var req JSApiMsgGetRequest
err := json.Unmarshal(msg, &req)
if err != nil {
hdr := []byte("NATS/1.0 408 Malformed Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// Check if nothing set.
if req.Seq == 0 && req.LastFor == _EMPTY_ {
hdr := []byte("NATS/1.0 408 Empty Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}
// Check that we do not have both options set.
if req.Seq > 0 && req.LastFor != _EMPTY_ {
hdr := []byte("NATS/1.0 408 Bad Request\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

var svp StoreMsg
var sm *StoreMsg

mset.mu.RLock()
store, name := mset.store, mset.cfg.Name
mset.mu.RUnlock()

if req.Seq > 0 {
sm, err = store.LoadMsg(req.Seq, &svp)
} else {
sm, err = store.LoadLastMsg(req.LastFor, &svp)
}
if err != nil {
hdr := []byte("NATS/1.0 404 Message Not Found\r\n\r\n")
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
return
}

hdr := sm.hdr
ts := time.Unix(0, sm.ts).UTC()

if len(hdr) == 0 {
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %v\r\n\r\n"
hdr = []byte(fmt.Sprintf(ht, name, sm.subj, sm.seq, ts))
} else {
hdr = copyBytes(hdr)
hdr = genHeader(hdr, JSStream, name)
hdr = genHeader(hdr, JSSubject, sm.subj)
hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))
hdr = genHeader(hdr, JSTimeStamp, ts.String())
}
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
}

// processInboundJetStreamMsg handles processing messages bound for a stream.
func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) {
mset.mu.RLock()
Expand Down