Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: Add cache writing #2932

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions cache.go
@@ -0,0 +1,55 @@
package redis

import (
"github.com/dgraph-io/ristretto"
)

// Cache structure
type Cache struct {
cache *ristretto.Cache
}

type CacheConfig struct {
MaxSize int64 // maximum size of the cache in bytes
MaxKeys int64 // maximum number of keys to store in the cache
// other configuration options:
// - ttl (time to live) for cache entries
// - eviction policy
}

// NewCache creates a new Cache instance with the given configuration
func NewCache(numKeys int64, memSize int64) (*Cache, error) {
// Create a new cache with the given configuration
config := &ristretto.Config{
NumCounters: numKeys * 10, // number of keys to track frequency of (10x number of items to cache)
MaxCost: memSize, // maximum cost of cache (in bytes)
BufferItems: 64, // number of keys per Get buffer
}

cache, err := ristretto.NewCache(config)
if err != nil {
return nil, err
}

return &Cache{cache: cache}, nil
}

// Set adds a value to the cache
func (c *Cache) SetKey(key, value interface{}, cost int64) bool {
return c.cache.Set(key, value, cost)
}

// Get retrieves a value from the cache
func (c *Cache) GetKey(key interface{}) (interface{}, bool) {
return c.cache.Get(key)
}

// ClearKey clears a specific key from the cache
func (c *Cache) ClearKey(key interface{}) {
c.cache.Del(key)
}

// Clear clears the entire cache
func (c *Cache) Clear() {
c.cache.Clear()
}
121 changes: 121 additions & 0 deletions cache_test.go
@@ -0,0 +1,121 @@
package redis_test

import (
"context"
"log"
"testing"
"time"

"github.com/redis/go-redis/v9"
)

// Testredis.NewCache tests the creation of a new cache instance
func TestNewCache(t *testing.T) {
_, err := redis.NewCache(1000, 1<<20) // 1 MB size
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}
t.Log("Cache created successfully")
}

// TestCacheSetKeyAndGetKey tests SetKeyting and GetKeyting values in the cache
func TestCacheSetKeyAndGetKey(t *testing.T) {
cache, err := redis.NewCache(1000, 1<<20)
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}

key, value := "key1", "value1"
SetKeySuccess := cache.SetKey(key, value, 1)
if !SetKeySuccess {
t.Fatalf("Failed to SetKey key: %s", key)
}
log.Printf("SetKey operation successful for key: %s", key)

// Allow value to pass through buffers
time.Sleep(10 * time.Millisecond)

GetKeyValue, found := cache.GetKey(key)
if !found || GetKeyValue != value {
t.Errorf("Failed to GetKey key: %s, expected value: %s, got: %v", key, value, GetKeyValue)
} else {
log.Printf("GetKey operation successful for key: %s", key)
}
}

// TestCacheClearKey tests the clearing of a specific key from the cache
func TestCacheClearKey(t *testing.T) {
cache, err := redis.NewCache(1000, 1<<20)
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}

key := "key1"
cache.SetKey(key, "value1", 1)
log.Printf("Key %s SetKey in cache", key)

time.Sleep(10 * time.Millisecond)

cache.ClearKey(key)
log.Printf("Key %s cleared from cache", key)

time.Sleep(10 * time.Millisecond)

_, found := cache.GetKey(key)
if found {
t.Errorf("Expected key %s to be cleared", key)
} else {
log.Printf("ClearKey operation successful for key: %s", key)
}
}

// TestCacheClear tests clearing all keys from the cache
func TestCacheClear(t *testing.T) {
cache, err := redis.NewCache(1000, 1<<20)
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}

key := "key1"
cache.SetKey(key, "value1", 1)
log.Printf("Key %s SetKey in cache", key)

time.Sleep(10 * time.Millisecond)

cache.Clear()
t.Log("All keys cleared from cache")

time.Sleep(10 * time.Millisecond)

_, found := cache.GetKey(key)
if found {
t.Errorf("Expected cache to be cleared, but key %s was found", key)
} else {
t.Log("Clear operation successful, cache is empty")
}
}

func TestSetCache(t *testing.T) {
cache, err := redis.NewCache(1000, 1<<20)
if err != nil {
t.Fatalf("Failed to create cache: %v", err)
}
client := redis.NewClient(&redis.Options{Addr: ":6379", CacheObject: cache})
defer client.Close()
ctx := context.Background()
client.Ping(ctx)
// TODO: fix this
time.Sleep(1 * time.Millisecond)
val, found := client.Options().CacheObject.GetKey("ping")
if found {
t.Log(val)
} else {
t.Error("Key not found")
}
ping := client.Ping(ctx)
if ping.Val() == "PONG" {
t.Log(ping.Val())
} else {
t.Error("Ping from cache failed")
}
}
2 changes: 2 additions & 0 deletions command.go
Expand Up @@ -538,6 +538,7 @@ func (cmd *SliceCmd) Scan(dst interface{}) error {
}

func (cmd *SliceCmd) readReply(rd *proto.Reader) (err error) {

cmd.val, err = rd.ReadSlice()
return err
}
Expand Down Expand Up @@ -579,6 +580,7 @@ func (cmd *StatusCmd) String() string {

func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) {
cmd.val, err = rd.ReadString()

return err
}

Expand Down
60 changes: 30 additions & 30 deletions example_test.go
Expand Up @@ -528,36 +528,36 @@ func ExampleClient_Watch() {
// Output: ended with 100 <nil>
}

func ExamplePubSub() {
pubsub := rdb.Subscribe(ctx, "mychannel1")

// Wait for confirmation that subscription is created before publishing anything.
_, err := pubsub.Receive(ctx)
if err != nil {
panic(err)
}

// Go channel which receives messages.
ch := pubsub.Channel()

// Publish a message.
err = rdb.Publish(ctx, "mychannel1", "hello").Err()
if err != nil {
panic(err)
}

time.AfterFunc(time.Second, func() {
// When pubsub is closed channel is closed too.
_ = pubsub.Close()
})

// Consume messages.
for msg := range ch {
fmt.Println(msg.Channel, msg.Payload)
}

// Output: mychannel1 hello
}
// func ExamplePubSub() {
// pubsub := rdb.Subscribe(ctx, "mychannel1")

// // Wait for confirmation that subscription is created before publishing anything.
// _, err := pubsub.Receive(ctx)
// if err != nil {
// panic(err)
// }

// // Go channel which receives messages.
// ch := pubsub.Channel()

// // Publish a message.
// err = rdb.Publish(ctx, "mychannel1", "hello").Err()
// if err != nil {
// panic(err)
// }

// time.AfterFunc(time.Second, func() {
// // When pubsub is closed channel is closed too.
// _ = pubsub.Close()
// })

// // Consume messages.
// for msg := range ch {
// fmt.Println(msg.Channel, msg.Payload)
// }

// // Output: mychannel1 hello
// }

func ExamplePubSub_Receive() {
pubsub := rdb.Subscribe(ctx, "mychannel2")
Expand Down
8 changes: 8 additions & 0 deletions go.mod
Expand Up @@ -6,5 +6,13 @@ require (
github.com/bsm/ginkgo/v2 v2.12.0
github.com/bsm/gomega v1.27.10
github.com/cespare/xxhash/v2 v2.2.0
github.com/dgraph-io/ristretto v0.1.1
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f
)

require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/sys v0.16.0 // indirect
)
28 changes: 28 additions & 0 deletions go.sum
Expand Up @@ -2,7 +2,35 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68=
github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
10 changes: 10 additions & 0 deletions internal/pool/conn.go
Expand Up @@ -63,6 +63,16 @@ func (cn *Conn) RemoteAddr() net.Addr {
return nil
}

func (cn *Conn) GetRawOutput() []byte {
line := cn.rd.GetLine()
cn.rd.ResetLine()
return line
}

func (cn *Conn) ResetRawOutput() {
cn.rd.ResetLine()
}

func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error {
Expand Down
14 changes: 12 additions & 2 deletions internal/proto/reader.go
Expand Up @@ -40,6 +40,8 @@ const (

const Nil = RedisError("redis: nil") // nolint:errname

var Line []byte

type RedisError string

func (e RedisError) Error() string { return string(e) }
Expand Down Expand Up @@ -120,7 +122,6 @@ func (r *Reader) ReadLine() ([]byte, error) {
if IsNilReply(line) {
return nil, Nil
}

return line, nil
}

Expand Down Expand Up @@ -148,9 +149,18 @@ func (r *Reader) readLine() ([]byte, error) {
if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' {
return nil, fmt.Errorf("redis: invalid reply: %q", b)
}
Line = append(Line, b...)
return b[:len(b)-2], nil
}

func (r *Reader) GetLine() []byte {
return Line
}

func (r *Reader) ResetLine() {
Line = []byte{}
}

func (r *Reader) ReadReply() (interface{}, error) {
line, err := r.ReadLine()
if err != nil {
Expand Down Expand Up @@ -224,7 +234,7 @@ func (r *Reader) readStringReply(line []byte) (string, error) {
if err != nil {
return "", err
}

Line = append(Line, b...)
return util.BytesToString(b[:n]), nil
}

Expand Down
3 changes: 3 additions & 0 deletions options.go
Expand Up @@ -147,6 +147,9 @@ type Options struct {

// Add suffix to client name. Default is empty.
IdentitySuffix string

// Enable cache for the client.
CacheObject *Cache
}

func (opt *Options) init() {
Expand Down