Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis streams #375

Open
samwhitecoull opened this issue Nov 20, 2018 · 6 comments
Open

Redis streams #375

samwhitecoull opened this issue Nov 20, 2018 · 6 comments

Comments

@samwhitecoull
Copy link

I am using Redigo in a project that relies on reading messages from streams. I know this is a newer feature of Redis, but is this something that Redigo plans to support natively in the future? I have cobbled together a stream reader by traversing through the nested interfaces returned from the Do() call and using type assertion (based on trial and improvement and knowledge of whats being sent) to pull useful results out. However this is extremely brittle and only works because I can guarantee whats being appended to the streams. If a code update is not required it would be great to see some documentation showing an idiomatic way of reading streams.

@smartwalle
Copy link

r, err := redis.Values(conn.Do("XREAD", "STREAMS", "stream1", "stream2", "0-0", "0-0"))

for kIndex :=0; kIndex < len(r); kIndex++ {
	var keyInfo = r[kIndex].([]interface{})

	var key = string(keyInfo[0].([]byte))
	var idList = keyInfo[1].([]interface{})

	for idIndex :=0; idIndex <len(idList); idIndex++ {
		var idInfo = idList[idIndex].([]interface{})

		var id = string(idInfo[0].([]byte))

		var fieldList = idInfo[1].([]interface{})
		var field = string(fieldList[0].([]byte))
		var value = string(fieldList[1].([]byte))

		fmt.Println(key, id, field, value)
	}
}

@samisagit
Copy link

apologies for the radio silence, life got busy! I'm happy to tackle this feature if the main man wants it implemented as part of the API - @garyburd, otherwise I'll close this and carry on using the wrapper I mentioned.

@nskforward
Copy link

I should have an ability to deserialize XREAD response to a struct{}
Currently I don't know how I can do it

@Explosivv
Copy link

any one merge this? this is helpful :)

@stevenh
Copy link
Collaborator

stevenh commented Jun 21, 2021

This is a issue not a PR, are your referring to #557 ?

@ostcar
Copy link

ostcar commented Nov 6, 2022

I hope a solution for this problem will be found soon.

But the solution has to consider, that a stream entry is not a map. It can contain the same field multiple times. For example:

127.0.0.1:6379> xadd test * field value1 field value2
"1667730787485-0"
127.0.0.1:6379> xread streams test 0
1) 1) "test"
   2) 1) 1) "1667730787485-0"
         2) 1) "field"
            2) "value1"
            3) "field"
            4) "value2"

The PR #557 parses the fields in a map. So it would return

map[string]string{"field":"value"}

I think it would be nice, if the the user would be able to parse the fields as he wants.

For example:

func parseStream(reply any, f func(k, v []byte)) (string, error) {
	valueList, err := redis.Values(reply, nil)
	if err != nil {
		return "", err
	}

	var lastID string
	for i, value := range valueList {
		idFields, ok := value.([]any)
		if !ok || len(idFields) != 2 {
			return "", fmt.Errorf("invalid stream value %d, got %v", i, value)
		}

		id, err := redis.String(idFields[0], nil)
		if err != nil {
			return "", fmt.Errorf("parsing id from entry %d: %w", i, err)
		}

		lastID = id

		fieldList, ok := idFields[1].([]any)
		if !ok || len(fieldList)%2 != 0 {
			return "", fmt.Errorf("invalid field list value %d, got %v", i, idFields[i])
		}

		for fi := 0; fi < len(fieldList); fi += 2 {
			key, ok := toByte(fieldList[fi])
			if !ok {
				return "", fmt.Errorf("field %d in entry %d is not a bulk string value, got %T", fi, i, fieldList[fi])
			}

			value, ok := toByte(fieldList[fi+1])
			if !ok {
				return "", fmt.Errorf("value %d in entry %d is not a bulk string value, got %T", fi+1, i, fieldList[fi])
			}

			f(key, value)
		}
	}
	return lastID, nil
}

// user function1 that builds a map
data := make(map[string]string)
lastID, err :=parseStream(reply, func(k,v []byte) {data[string(k)]=string(v)})

// user function2 that only wants the values
var data []string
lastID, err :=parseStream(reply, func(_,v []byte) {data=append(data,string(v)})

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants