Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReceiveWithTimeout is closing network connections - support for interruptable Receive()? #552

Closed
SnirkImmington opened this issue Feb 3, 2021 · 1 comment

Comments

@SnirkImmington
Copy link

SnirkImmington commented Feb 3, 2021

I'm not very familiar with asynchronous Go development, so I'll try to explain what I'm doing in case the issue is my use of the library. I'm developing a webserver for a chat client. A natural use for Redis here is to send new chat messages over a pub/sub channel. The client establishes a WebSocket connection (handleConnection) to the server, and the server subscribes to a Redis channel and streams over responses (handleSubscription). Here is a highly simplified example of what the code could look like:

func handleConnection(req Request) {
    ctx := getShutdownContext() // Make sure we perform `cleanup()` on ctrl-c or rotation on prod
    // Basically, I need a way to signal handleSubscription to stop, be it context or channel.
    messages := go handleSubscription(ctx)
    defer cleanup()
    for {
        select {
    	case msg := <- messages:
      		req.Send(msg)
      		continue
        // More cases here to respond to client closing the connection
    	case <- ctx.Close():
      		extraShutdownStuff()
      		return 
    	}
    }
}

func handleSubscription(ctx Context) messages chan string {
    messages = make(chan string)
    conn := redis.PubSubConn{Conn: redisPool.Get()}
    defer cleanup()
    
    for {
        select {
        case <- ctx.Done(): // Close subscription if client disconnects
            // We got asked to quit by handleConnection()
            return
        }
        // I think we also check if the request handler closed the `messages` channel here
        
        // **This blocks forever if all users leave the channel**
		switch conn.Receive().(type) {
		    // ... regular pub/sub handling
		}    
    }
}

Like I said, the actual code is more involved and does error handling, but my question is how do I signal/stop the subscription loop after the client disconnects? The code shown above will only check cxt.Done() after Receive() finishes blocking - which could never happen if the channel never receives a new message.

I've tried using conn.ReceiveWithTimeout() but that will close the underlying connection after a loop.

subscribe.go: Error from ReceiveWithTimeout: read tcp 127.0.0.1:39430->127.0.0.1:6379: use of closed network connection

I've tried using the regular connection from Pool.Get() instead of the pub/sub one, but that seems to have the same issue. I could just handle the error by dialing a new connection but it seems a little wasteful - and that could result in dropped messages. I could actually publish a Redis message that handleSubcription() would check and exit for, but now my ctrl-c handler is roundtripping Redis to ask a goroutine to stop.

As long as there is no way for Receive() to be non-blocking, I'm unable to stop the goroutine and prevent a memory leak. (If there isn't a separate goroutine then we can't respond to clients disconnect at all.) I noticed that #537 adds ReceiveContext() but I don't think it would fix this issue, as it just calls ReceiveWithTimeout in another goroutine, which seems to inevitably close the underlying network connection.

@stevenh
Copy link
Collaborator

stevenh commented Feb 3, 2021

You need to unsubscribe, which will make the pub sub connection return a redis.Subscription instead of redis.Message.

Please note that issues are for bugs and not help using the library, something like stackoverflow.com is better for that.

@stevenh stevenh closed this as completed Feb 3, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants