Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add parsing for stream entries #557

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open

Conversation

smotes
Copy link

@smotes smotes commented Mar 24, 2021

This PR is related to issue #375 and adds a first pass at parsing replies containing stream entries into a struct.

I tested things against a Redis server v5.0.7, which is the first version containing the streams feature: https://redislabs.com/blog/redis-5-0-is-here/.

Copy link
Collaborator

@stevenh stevenh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking at this.

I've done a quick pass and adding some initial feedback

redis/reply_test.go Outdated Show resolved Hide resolved
redis/reply_test.go Outdated Show resolved Hide resolved
redis/reply_test.go Outdated Show resolved Hide resolved
@smotes
Copy link
Author

smotes commented Mar 26, 2021

Thanks for looking at this.

I've done a quick pass and adding some initial feedback

See commit 4a86003.

@stevenh stevenh mentioned this pull request Jun 21, 2021
@tk42
Copy link

tk42 commented Jul 31, 2021

Is this working for XREAD or XREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.

@stevenh
Copy link
Collaborator

stevenh commented Sep 30, 2021

Is this working for XREAD or XREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.

You raise a valid question @tk42 it does look like this will only work for XRANGE which I don't think is clear from the definition of Entries. So I wonder if there is a better way to do this which is more generic.

Any thoughts @smotes ?

@smotes
Copy link
Author

smotes commented Jan 7, 2022

Hi @stevenh.

I propose we keep things as they stand. The Redis docs distinguish between entries and stream names in return types, so I felt it on the caller to parse out the reply in parts and know where to find the nested entries within.

Consider the XREADGROUP Redis command docs (https://redis.io/commands/xreadgroup#return-value).

Array reply, specifically:
The command returns an array of results: each element of the returned array is an array composed of a two element containing the key name and the entries reported for that key. The entries reported are full stream entries, having IDs and the list of all the fields and values. Field and values are guaranteed to be reported in the same order they were added by XADD.

And here's a little code snippet showing how to parse that as is (adapted from some code in a personal project).

reply, err := sc.conn.Do("XREADGROUP", "GROUP", "some-group-name", "some-consumer-name", 
	"BLOCK", 3000, "COUNT", 10, "STREAMS", "some-stream-name", ">")
vs, err := redis.Values(reply, err)
if err != nil {
	if errors.Is(err, redis.ErrNil) {
		continue
	}
	return err
}

// Get the first and only value in the array since we're only
// reading from one stream "some-stream-name" here.
vs, err = redis.Values(vs[0], nil)
if err != nil {
	return err
}

// Ignore the stream name as the first value as we already have 
// that in hand! Just get the second value which is guaranteed to 
// exist per the docs, and parse it as some stream entries.
entries, err := ParseEntries(vs[1], nil)
if err != nil {
	return fmt.Errorf("error parsing entries: %w", err)
}

@stevenh
Copy link
Collaborator

stevenh commented Jan 7, 2022

That makes sense to me @tk42 does that answer address your concerns?

@tk42
Copy link

tk42 commented Jan 8, 2022

Hi @smotes @stevenh
In that snippet, it contains only a single stream key but how about multiple stream key case? From redis docs, XREAD/XREADGROUP can take multiple keys (and corresponded ids).

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

I’m feeling that users for XREAD/XREADGROUP will call redis.Values multiple times as a template pattern, so a new struct of return-type for them might help those users. Something like this ;)

type StreamEntry struct {
    Stream       string
    Entries      []Entry
}

and its corresponded parsing function like this

func StreamEntries(reply interface{}, err error) ([]StreamEntry, error) {
    vss, err := redis.Values(reply, err)
    if err != nil {
        // error handling
	    return nil, fmt.Errorf("error parsing reply: %w", err)
    }

    var streamEntries []StreamEntry
    for _, vs := range vss {
        name, err := redis.String(vs[0], nil)
        if err != nil {
            // error handling
            streamEntries = append(streamEntries, new(StreamEntry))
        }

        entries, err := ParseEntries(vs[1], nil)
        if err != nil {
            // error handling
            streamEntries = append(streamEntries, new(StreamEntry))
        }

        streamEntries = append(streamEntries, StreamEntry{
               Stream: name,
               Entries: entries,
        })
    }
    return streamEntries, nil
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants