Skip to content

Commit

Permalink
feat: Allow for pubsub connections to optionally be taken from pool
Browse files Browse the repository at this point in the history
- After use, they are removed from the pool
- Only applicable to non-cluster client
  • Loading branch information
vtermanis committed Jan 29, 2024
1 parent 2512123 commit 49e1dc8
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 6 deletions.
5 changes: 5 additions & 0 deletions options.go
Expand Up @@ -147,6 +147,11 @@ type Options struct {

// Add suffix to client name. Default is empty.
IdentitySuffix string

// Use connections from pool instead of creating new ones. Note that after use these connections will not be
// returned to the pool. Useful for managing the total Redis connection limit for a mix of Pubsub & other commands.
// Applies only to non-cluster client. Default is false.
PubsubFromPool bool
}

func (opt *Options) init() {
Expand Down
7 changes: 5 additions & 2 deletions pubsub.go
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/redis/go-redis/v9/internal/proto"
)

type PubsubNewConnFunc func(ctx context.Context, channels []string) (*pool.Conn, error)
type PubsubCloseConnFunc func(*pool.Conn) error

// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. Message receiving is NOT safe
// for concurrent use by multiple goroutines.
Expand All @@ -21,8 +24,8 @@ import (
type PubSub struct {
opt *Options

newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
closeConn func(*pool.Conn) error
newConn PubsubNewConnFunc
closeConn PubsubCloseConnFunc

mu sync.Mutex
cn *pool.Conn
Expand Down
100 changes: 100 additions & 0 deletions pubsub_test.go
Expand Up @@ -567,4 +567,104 @@ var _ = Describe("PubSub", func() {
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal(text))
})

It("should not use connections from pool", func() {
statsBefore := client.PoolStats()

pubsub := client.Subscribe(ctx, "mychannel")
defer pubsub.Close()

stats := client.PoolStats()
// A connection has been created
Expect(stats.TotalConns - statsBefore.TotalConns).To(Equal(uint32(1)))
// But it's not taken from the pool
poolFetchesBefore := statsBefore.Hits + statsBefore.Misses
poolFetchesAfter := stats.Hits + stats.Misses
Expect(poolFetchesAfter - poolFetchesBefore).To(Equal(uint32(0)))

pubsub.Close()

stats = client.PoolStats()
// The connection no longer exists
Expect(stats.TotalConns - statsBefore.TotalConns).To(Equal(uint32(0)))
Expect(stats.IdleConns - statsBefore.IdleConns).To(Equal(uint32(0)))
})
})

var _ = Describe("PubSub with PubsubFromPool set", func() {
var client *redis.Client

BeforeEach(func() {
opt := redisOptions()
opt.MinIdleConns = 0
opt.ConnMaxLifetime = 0
opt.PubsubFromPool = true
// zero value ends up using default so set small instead
opt.PoolTimeout = time.Microsecond
client = redis.NewClient(opt)
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should use connection from pool", func() {
statsBefore := client.PoolStats()

pubsub := client.Subscribe(ctx, "mychannel")
defer pubsub.Close()

stats := client.PoolStats()
// A connection has been taken from the pool
Expect(stats.Hits - statsBefore.Hits).To(Equal(uint32(1)))
statsDuring := client.PoolStats()

pubsub.Close()

stats = client.PoolStats()
// It's not returned to the idle pool ..
Expect(statsDuring.IdleConns - stats.IdleConns).To(Equal(uint32(0)))
// .. and has been terminated
Expect(statsDuring.TotalConns - stats.TotalConns).To(Equal(uint32(1)))
})

It("should respect pool size limit", func() {
poolSize := client.Options().PoolSize
statsBefore := client.PoolStats()

var pubsubs []*redis.PubSub
for i := 0; i < poolSize; i++ {
pubsub := client.Subscribe(ctx, "mychannel")
defer pubsub.Close()
pubsubs = append(pubsubs, pubsub)
}

statsDuring := client.PoolStats()
poolFetchesBefore := statsBefore.Hits + statsBefore.Misses
poolFetchesAfter := statsDuring.Hits + statsDuring.Misses

// A total of poolSize connections should have been taken from the pool (new or existing)
Expect(poolFetchesAfter - poolFetchesBefore).To(Equal(uint32(poolSize)))

// The next pubsub connection should fail to connect (waiting for pool)
extraPubsub := client.Subscribe(ctx, "mychannel")
defer extraPubsub.Close()
Expect(client.PoolStats().Timeouts - statsDuring.Timeouts).To(Equal(uint32(1)))

// As should retries
err := extraPubsub.Ping(ctx)
Expect(err).To(MatchError(ContainSubstring("connection pool timeout")))
Expect(client.PoolStats().Timeouts - statsDuring.Timeouts).To(Equal(uint32(2)))

for _, pubsub := range pubsubs {
pubsub.Close()
}

stats := client.PoolStats()
// Connections are not returned to the idle pool ..
Expect(statsDuring.IdleConns - stats.IdleConns).To(Equal(uint32(0)))
// .. and have been terminated
Expect(statsDuring.TotalConns - stats.TotalConns).To(Equal(uint32(poolSize)))
})
})
38 changes: 34 additions & 4 deletions redis.go
Expand Up @@ -199,6 +199,9 @@ type baseClient struct {
opt *Options
connPool pool.Pooler

pubsubNewConn PubsubNewConnFunc
pubsubCloseConn PubsubCloseConnFunc

onClose func() error // hook called when client is closed
}

Expand Down Expand Up @@ -368,6 +371,13 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
}
}

func (c *baseClient) removeConn(ctx context.Context, cn *pool.Conn, err error) {
if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err)
}
c.connPool.Remove(ctx, cn, err)
}

func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
Expand Down Expand Up @@ -649,6 +659,28 @@ func (c *Client) init() {
pipeline: c.baseClient.processPipeline,
txPipeline: c.baseClient.processTxPipeline,
})

if c.opt.PubsubFromPool {
// Take connections from pool and remove them from pool afterwards. (Pubsub & other connections are managed
// together.)
c.pubsubNewConn = func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.getConn(ctx)
}
c.pubsubCloseConn = func(conn *pool.Conn) error {
c.removeConn(context.TODO(), conn, nil)
return nil
}
} else {
// Make brand new connection from pool and close it afterwards. (Pubsub & other connections are managed
// independently other than that pubsub connection can no longer be created once the pool is full.)
c.pubsubNewConn = func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
}
// wrapping in closure since pool has not been initialised yet
c.pubsubCloseConn = func(conn *pool.Conn) error {
return c.connPool.CloseConn(conn)
}
}
}

func (c *Client) WithTimeout(timeout time.Duration) *Client {
Expand Down Expand Up @@ -720,10 +752,8 @@ func (c *Client) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,

newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
},
closeConn: c.connPool.CloseConn,
newConn: c.pubsubNewConn,
closeConn: c.pubsubCloseConn,
}
pubsub.init()
return pubsub
Expand Down

0 comments on commit 49e1dc8

Please sign in to comment.