diff --git a/go.mod b/go.mod index b9419895522..51d62fa52d5 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63 github.com/dustin/go-humanize v1.0.0 - github.com/go-redis/redis/v7 v7.4.1 + github.com/go-redis/redis/v8 v8.11.3 github.com/gofrs/uuid v4.0.0+incompatible github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f @@ -33,8 +33,6 @@ require ( github.com/mssola/user_agent v0.5.3 github.com/ncw/swift v1.0.53 github.com/nightlyone/lockfile v1.0.0 - github.com/onsi/ginkgo v1.15.0 // indirect - github.com/onsi/gomega v1.10.5 // indirect github.com/oschwald/maxminddb-golang v1.8.0 github.com/pquerna/otp v1.3.0 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index 3fc9ce52ad7..a8d6c2f228b 100644 --- a/go.sum +++ b/go.sum @@ -107,6 +107,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63 h1:/u5RVRk3Nh7Zw1QQnPtUH5kzcc8JmSSRpHSlGU/zGTE= github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63/go.mod h1:SniNVYuaD1jmdEEvi+7ywb1QFR7agjeTdGKyFb0p7Rw= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= @@ -139,10 +141,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= -github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI= -github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8= +github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc= github.com/go-stack/stack v1.6.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -362,18 +365,17 @@ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks= github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM= github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA= github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= -github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= -github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= -github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= -github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= -github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= +github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU= +github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0= github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk= github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= @@ -573,12 +575,12 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211008194852-3b03d305991f h1:1scJEYZBaF48BaG6tYbtxmLcXqwYGSfGcMoStTqkkIw= @@ -639,7 +641,6 @@ golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/model/job/redis_broker.go b/model/job/redis_broker.go index 59c39c983b5..3d47c47beae 100644 --- a/model/job/redis_broker.go +++ b/model/job/redis_broker.go @@ -11,7 +11,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/limits" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" multierror "github.com/hashicorp/go-multierror" "github.com/sirupsen/logrus" ) @@ -25,6 +25,7 @@ const ( type redisBroker struct { client redis.UniversalClient + ctx context.Context workers []*Worker workersRunning []*Worker workersTypes []string @@ -37,6 +38,7 @@ type redisBroker struct { func NewRedisBroker(client redis.UniversalClient) Broker { return &redisBroker{ client: client, + ctx: context.Background(), closed: make(chan struct{}), } } @@ -150,7 +152,7 @@ func (b *redisBroker) pollLoop(key string, ch chan<- *Job) { if rng.Intn(3) == 0 { keyP1, keyP0 = keyP0, keyP1 } - results, err := b.client.BRPop(redisBRPopTimeout, keyP0, keyP1).Result() + results, err := b.client.BRPop(b.ctx, redisBRPopTimeout, keyP0, keyP1).Result() if err != nil || len(results) < 2 { time.Sleep(100 * time.Millisecond) continue @@ -243,7 +245,7 @@ func (b *redisBroker) PushJob(db prefixer.Prefixer, req *JobRequest) (*Job, erro key += redisHighPrioritySuffix } - if err := b.client.LPush(key, val).Err(); err != nil { + if err := b.client.LPush(b.ctx, key, val).Err(); err != nil { return nil, err } @@ -254,11 +256,11 @@ func (b *redisBroker) PushJob(db prefixer.Prefixer, req *JobRequest) (*Job, erro // specified worker type. func (b *redisBroker) WorkerQueueLen(workerType string) (int, error) { key := redisPrefix + workerType - l1, err := b.client.LLen(key).Result() + l1, err := b.client.LLen(b.ctx, key).Result() if err != nil { return 0, err } - l2, err := b.client.LLen(key + redisHighPrioritySuffix).Result() + l2, err := b.client.LLen(b.ctx, key+redisHighPrioritySuffix).Result() if err != nil { return 0, err } diff --git a/model/job/redis_broker_test.go b/model/job/redis_broker_test.go index 9624af2137e..563300ce291 100644 --- a/model/job/redis_broker_test.go +++ b/model/job/redis_broker_test.go @@ -12,7 +12,7 @@ import ( "github.com/cozy/cozy-stack/model/job" jobs "github.com/cozy/cozy-stack/model/job" "github.com/cozy/cozy-stack/pkg/limits" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) diff --git a/model/job/redis_scheduler.go b/model/job/redis_scheduler.go index c2209740742..c771498219e 100644 --- a/model/job/redis_scheduler.go +++ b/model/job/redis_scheduler.go @@ -16,7 +16,7 @@ import ( "github.com/cozy/cozy-stack/pkg/logger" "github.com/cozy/cozy-stack/pkg/prefixer" "github.com/cozy/cozy-stack/pkg/realtime" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" ) @@ -57,6 +57,7 @@ return t` type redisScheduler struct { broker Broker client redis.UniversalClient + ctx context.Context closed chan struct{} stopped chan struct{} log *logrus.Entry @@ -67,6 +68,7 @@ type redisScheduler struct { func NewRedisScheduler(client redis.UniversalClient) Scheduler { return &redisScheduler{ client: client, + ctx: context.Background(), log: logger.WithNamespace("scheduler-redis"), stopped: make(chan struct{}), } @@ -136,7 +138,7 @@ func (s *redisScheduler) startEventDispatcher() { func (s *redisScheduler) eventLoop(eventsCh <-chan *realtime.Event) { for event := range eventsCh { key := eventsKey(event) - m, err := s.client.HGetAll(key).Result() + m, err := s.client.HGetAll(s.ctx, key).Result() if err != nil { s.log.Errorf("Could not fetch redis set %s: %s", key, err.Error()) @@ -170,7 +172,7 @@ func (s *redisScheduler) eventLoop(eventsCh <-chan *realtime.Event) { var d time.Duration if d, err = time.ParseDuration(et.Infos().Debounce); err == nil { timestamp := time.Now().Add(d) - s.client.ZAddNX(TriggersKey, &redis.Z{ + s.client.ZAddNX(s.ctx, TriggersKey, &redis.Z{ Score: float64(timestamp.UTC().Unix()), Member: redisKey(t), }) @@ -217,15 +219,15 @@ func (s *redisScheduler) fire(trigger Trigger, request *JobRequest) { pipe := s.client.Pipeline() switch trigger.CombineRequest() { case appendPayload: - pipe.RPush(payloadKey(trigger), string(request.Payload)) + pipe.RPush(s.ctx, payloadKey(trigger), string(request.Payload)) case keepOriginalRequest: - pipe.SetNX(payloadKey(trigger), string(request.Payload), 30*24*time.Hour) + pipe.SetNX(s.ctx, payloadKey(trigger), string(request.Payload), 30*24*time.Hour) } - pipe.ZAddNX(TriggersKey, &redis.Z{ + pipe.ZAddNX(s.ctx, TriggersKey, &redis.Z{ Score: float64(timestamp.UTC().Unix()), Member: redisKey(trigger), }) - if _, err := pipe.Exec(); err != nil { + if _, err := pipe.Exec(s.ctx); err != nil { s.log.Warnf("Cannot fire trigger because of redis error: %s", err) } } @@ -251,7 +253,7 @@ func (s *redisScheduler) ShutdownScheduler(ctx context.Context) error { func (s *redisScheduler) PollScheduler(now int64) error { keys := []string{strconv.FormatInt(now, 10)} for { - res, err := s.client.Eval(luaPoll, keys).Result() + res, err := s.client.Eval(s.ctx, luaPoll, keys).Result() if err != nil || res == nil { return err } @@ -264,7 +266,7 @@ func (s *redisScheduler) PollScheduler(now int64) error { } parts := strings.SplitN(results[0].(string), "/", 2) if len(parts) != 2 { - s.client.ZRem(SchedKey, results[0]) + s.client.ZRem(s.ctx, SchedKey, results[0]) return fmt.Errorf("Invalid key %s", res) } @@ -272,7 +274,7 @@ func (s *redisScheduler) PollScheduler(now int64) error { t, err := s.GetTrigger(prefixer.NewPrefixer("", prefix), parts[1]) if err != nil { if err == ErrNotFoundTrigger || err == ErrMalformedTrigger { - s.client.ZRem(SchedKey, results[0]) + s.client.ZRem(s.ctx, SchedKey, results[0]) } return err } @@ -280,23 +282,23 @@ func (s *redisScheduler) PollScheduler(now int64) error { case *EventTrigger, *WebhookTrigger: // Debounced job := t.Infos().JobRequest() job.Debounced = true - if err = s.client.ZRem(SchedKey, results[0]).Err(); err != nil { + if err = s.client.ZRem(s.ctx, SchedKey, results[0]).Err(); err != nil { return err } switch t.CombineRequest() { case appendPayload: pipe := s.client.Pipeline() - lrange := pipe.LRange(payloadKey(t), 0, -1) - pipe.Del(payloadKey(t)) - if _, err := pipe.Exec(); err == nil { + lrange := pipe.LRange(s.ctx, payloadKey(t), 0, -1) + pipe.Del(s.ctx, payloadKey(t)) + if _, err := pipe.Exec(s.ctx); err == nil { payloads := strings.Join(lrange.Val(), ",") job.Payload = Payload(`{"payloads":[` + payloads + "]}") } case keepOriginalRequest: pipe := s.client.Pipeline() - get := pipe.Get(payloadKey(t)) - pipe.Del(payloadKey(t)) - if _, err := pipe.Exec(); err == nil { + get := pipe.Get(s.ctx, payloadKey(t)) + pipe.Del(s.ctx, payloadKey(t)) + if _, err := pipe.Exec(s.ctx); err == nil { job.Payload = Payload(get.Val()) } } @@ -307,7 +309,7 @@ func (s *redisScheduler) PollScheduler(now int64) error { job := t.Infos().JobRequest() if _, err = s.broker.PushJob(t, job); err != nil { if limits.IsLimitReachedOrExceeded(err) { - s.client.ZRem(SchedKey, results[0]) + s.client.ZRem(s.ctx, SchedKey, results[0]) } return err } @@ -320,7 +322,7 @@ func (s *redisScheduler) PollScheduler(now int64) error { // Remove the cron trigger from redis if it is invalid, as it // may block other cron triggers if err == ErrUnknownWorker || limits.IsLimitReachedOrExceeded(err) { - s.client.ZRem(SchedKey, results[0]) + s.client.ZRem(s.ctx, SchedKey, results[0]) continue } return err @@ -355,7 +357,7 @@ func (s *redisScheduler) addToRedis(t Trigger, prev time.Time) error { switch t := t.(type) { case *EventTrigger: hKey := eventsKey(t) - return s.client.HSet(hKey, t.ID(), t.Infos().Arguments).Err() + return s.client.HSet(s.ctx, hKey, t.ID(), t.Infos().Arguments).Err() case *AtTrigger: timestamp = t.at case *CronTrigger: @@ -370,18 +372,18 @@ func (s *redisScheduler) addToRedis(t Trigger, prev time.Time) error { return errors.New("Not implemented yet") } pipe := s.client.Pipeline() - err := pipe.ZAdd(TriggersKey, &redis.Z{ + err := pipe.ZAdd(s.ctx, TriggersKey, &redis.Z{ Score: float64(timestamp.UTC().Unix()), Member: redisKey(t), }).Err() if err != nil { return err } - err = pipe.ZRem(SchedKey, redisKey(t)).Err() + err = pipe.ZRem(s.ctx, SchedKey, redisKey(t)).Err() if err != nil { return err } - _, err = pipe.Exec() + _, err = pipe.Exec(s.ctx) return err } @@ -420,13 +422,13 @@ func (s *redisScheduler) UpdateCron(db prefixer.Prefixer, trigger Trigger, argum } timestamp := updated.NextExecution(time.Now()) pipe := s.client.Pipeline() - pipe.ZRem(TriggersKey, redisKey(updated)) - pipe.ZRem(SchedKey, redisKey(updated)) - pipe.ZAdd(TriggersKey, &redis.Z{ + pipe.ZRem(s.ctx, TriggersKey, redisKey(updated)) + pipe.ZRem(s.ctx, SchedKey, redisKey(updated)) + pipe.ZAdd(s.ctx, TriggersKey, &redis.Z{ Score: float64(timestamp.UTC().Unix()), Member: redisKey(updated), }) - _, err = pipe.Exec() + _, err = pipe.Exec(s.ctx) return err } @@ -446,12 +448,12 @@ func (s *redisScheduler) deleteTrigger(t Trigger) error { } switch t.(type) { case *EventTrigger: - return s.client.HDel(eventsKey(t), t.ID()).Err() + return s.client.HDel(s.ctx, eventsKey(t), t.ID()).Err() case *AtTrigger, *CronTrigger: pipe := s.client.Pipeline() - pipe.ZRem(TriggersKey, redisKey(t)) - pipe.ZRem(SchedKey, redisKey(t)) - _, err := pipe.Exec() + pipe.ZRem(s.ctx, TriggersKey, redisKey(t)) + pipe.ZRem(s.ctx, SchedKey, redisKey(t)) + _, err := pipe.Exec(s.ctx) return err } return nil @@ -489,7 +491,7 @@ func (s *redisScheduler) GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error) func (s *redisScheduler) HasEventTrigger(trigger Trigger) bool { infos := trigger.Infos() key := eventsKey(trigger) - m, err := s.client.HGetAll(key).Result() + m, err := s.client.HGetAll(s.ctx, key).Result() if err != nil { s.log.Errorf("Could not fetch redis set %s: %s", key, err) return false @@ -514,7 +516,7 @@ func (s *redisScheduler) HasEventTrigger(trigger Trigger) bool { // CleanRedis removes clean redis by removing the two sets holding the triggers // states. func (s *redisScheduler) CleanRedis() error { - return s.client.Del(TriggersKey, SchedKey).Err() + return s.client.Del(s.ctx, TriggersKey, SchedKey).Err() } // RebuildRedis puts all the triggers in redis (idempotent) diff --git a/model/job/redis_scheduler_test.go b/model/job/redis_scheduler_test.go index d7b9d94071b..151010a7a90 100644 --- a/model/job/redis_scheduler_test.go +++ b/model/job/redis_scheduler_test.go @@ -17,7 +17,7 @@ import ( "github.com/cozy/cozy-stack/pkg/realtime" "github.com/cozy/cozy-stack/pkg/utils" "github.com/cozy/cozy-stack/tests/testutils" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) @@ -200,7 +200,7 @@ func TestRedisSchedulerWithTimeTriggers(t *testing.T) { func TestRedisSchedulerWithCronTriggers(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - err := client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + err := client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() assert.NoError(t, err) bro := &mockBroker{} @@ -234,7 +234,7 @@ func TestRedisSchedulerWithCronTriggers(t *testing.T) { func TestRedisPollFromSchedKey(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - err := client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + err := client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() assert.NoError(t, err) bro := &mockBroker{} @@ -261,7 +261,7 @@ func TestRedisPollFromSchedKey(t *testing.T) { ts := now.UTC().Unix() key := testInstance.DBPrefix() + "/" + tat.ID() - err = client.ZAdd(jobs.SchedKey, &redis.Z{ + err = client.ZAdd(context.Background(), jobs.SchedKey, &redis.Z{ Score: float64(ts + 1), Member: key, }).Err() @@ -283,7 +283,7 @@ func TestRedisPollFromSchedKey(t *testing.T) { func TestRedisTriggerEvent(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - err := client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + err := client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() assert.NoError(t, err) bro := &mockBroker{} @@ -350,7 +350,7 @@ func (d fakeFilePather) FilePath(doc *vfs.FileDoc) (string, error) { func TestRedisTriggerEventForDirectories(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - err := client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + err := client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() assert.NoError(t, err) bro := &mockBroker{} @@ -460,7 +460,7 @@ func TestRedisSchedulerWithDebounce(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - err := client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + err := client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() assert.NoError(t, err) bro := &mockBroker{} @@ -520,7 +520,7 @@ func TestMain(m *testing.M) { cfg.Jobs.RedisConfig = was opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - return client.Del(jobs.TriggersKey, jobs.SchedKey).Err() + return client.Del(context.Background(), jobs.TriggersKey, jobs.SchedKey).Err() }) os.Exit(setup.Run()) diff --git a/model/move/store.go b/model/move/store.go index b5e739a8574..9d556ed1196 100644 --- a/model/move/store.go +++ b/model/move/store.go @@ -1,6 +1,7 @@ package move import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) // Store is essentially an object to store and retrieve move requests @@ -41,7 +42,8 @@ func GetStore() Store { if cli == nil { globalStore = newMemStore() } else { - globalStore = &redisStore{cli} + ctx := context.Background() + globalStore = &redisStore{cli, ctx} } return globalStore } @@ -135,12 +137,13 @@ func (s *memStore) AllowDeleteAccounts(db prefixer.Prefixer) bool { } type redisStore struct { - c redis.UniversalClient + c redis.UniversalClient + ctx context.Context } func (s *redisStore) GetRequest(db prefixer.Prefixer, secret string) (*Request, error) { key := requestKey(db, secret) - b, err := s.c.Get(key).Bytes() + b, err := s.c.Get(s.ctx, key).Bytes() if err == redis.Nil { return nil, nil } @@ -161,7 +164,7 @@ func (s *redisStore) SaveRequest(db prefixer.Prefixer, req *Request) (string, er } secret := makeSecret() key := requestKey(db, secret) - if err = s.c.Set(key, v, storeTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, key, v, storeTTL).Err(); err != nil { return "", err } return secret, nil @@ -169,17 +172,17 @@ func (s *redisStore) SaveRequest(db prefixer.Prefixer, req *Request) (string, er func (s *redisStore) SetAllowDeleteAccounts(db prefixer.Prefixer) error { key := deleteAccountsKey(db) - return s.c.Set(key, true, storeTTL).Err() + return s.c.Set(s.ctx, key, true, storeTTL).Err() } func (s *redisStore) ClearAllowDeleteAccounts(db prefixer.Prefixer) error { key := deleteAccountsKey(db) - return s.c.Del(key).Err() + return s.c.Del(s.ctx, key).Err() } func (s *redisStore) AllowDeleteAccounts(db prefixer.Prefixer) bool { key := deleteAccountsKey(db) - r, err := s.c.Exists(key).Result() + r, err := s.c.Exists(s.ctx, key).Result() if err != nil { return false } diff --git a/model/office/store.go b/model/office/store.go index e9e70f2ecf0..56e97c0617f 100644 --- a/model/office/store.go +++ b/model/office/store.go @@ -1,6 +1,7 @@ package office import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) type conflictDetector struct { @@ -46,7 +47,8 @@ func GetStore() Store { if cli == nil { globalStore = newMemStore() } else { - globalStore = &redisStore{cli} + ctx := context.Background() + globalStore = &redisStore{cli, ctx} } return globalStore } @@ -137,12 +139,13 @@ func (s *memStore) RemoveDoc(db prefixer.Prefixer, secret string) error { } type redisStore struct { - c redis.UniversalClient + c redis.UniversalClient + ctx context.Context } func (s *redisStore) AddDoc(db prefixer.Prefixer, payload conflictDetector) (string, error) { idKey := docKey(db, payload.ID) - if secret, err := s.c.Get(idKey).Result(); err == nil { + if secret, err := s.c.Get(s.ctx, idKey).Result(); err == nil { return secret, nil } v, err := json.Marshal(payload) @@ -151,16 +154,16 @@ func (s *redisStore) AddDoc(db prefixer.Prefixer, payload conflictDetector) (str } secret := makeSecret() key := docKey(db, secret) - if err = s.c.Set(key, v, storeTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, key, v, storeTTL).Err(); err != nil { return "", err } - _ = s.c.Set(idKey, secret, storeTTL) + _ = s.c.Set(s.ctx, idKey, secret, storeTTL) return secret, nil } func (s *redisStore) GetDoc(db prefixer.Prefixer, secret string) (*conflictDetector, error) { key := docKey(db, secret) - b, err := s.c.Get(key).Bytes() + b, err := s.c.Get(s.ctx, key).Bytes() if err == redis.Nil { return nil, nil } @@ -180,7 +183,7 @@ func (s *redisStore) UpdateDoc(db prefixer.Prefixer, secret string, payload conf return err } key := docKey(db, secret) - if err = s.c.Set(key, v, storeTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, key, v, storeTTL).Err(); err != nil { return err } return nil @@ -190,10 +193,10 @@ func (s *redisStore) RemoveDoc(db prefixer.Prefixer, secret string) error { payload, _ := s.GetDoc(db, secret) if payload != nil { idKey := docKey(db, payload.ID) - _ = s.c.Del(idKey) + _ = s.c.Del(s.ctx, idKey) } key := docKey(db, secret) - return s.c.Del(key).Err() + return s.c.Del(s.ctx, key).Err() } func docKey(db prefixer.Prefixer, suffix string) string { diff --git a/model/session/registration_logins.go b/model/session/registration_logins.go index 4aa542ec7c5..448a7f4149e 100644 --- a/model/session/registration_logins.go +++ b/model/session/registration_logins.go @@ -99,7 +99,8 @@ func PushLoginRegistration(db prefixer.Prefixer, login *LoginEntry, clientID str if err != nil { return err } - return cli.HSet(redisRegistationKey, entry.Key(), b).Err() + ctx := context.Background() + return cli.HSet(ctx, redisRegistationKey, entry.Key(), b).Err() } registrationsMapLock.Lock() @@ -116,7 +117,8 @@ func RemoveLoginRegistration(domain, clientID string) error { var entryPtr *registrationEntry key := domain + "|" + clientID if cli := config.GetConfig().SessionStorage.Client(); cli != nil { - b, err := cli.HGet(redisRegistationKey, key).Result() + ctx := context.Background() + b, err := cli.HGet(ctx, redisRegistationKey, key).Result() if err != nil { return err } @@ -124,7 +126,7 @@ func RemoveLoginRegistration(domain, clientID string) error { if err = json.Unmarshal([]byte(b), &entry); err != nil { return err } - if err = cli.HDel(redisRegistationKey, key).Err(); err != nil { + if err = cli.HDel(ctx, redisRegistationKey, key).Err(); err != nil { return err } entryPtr = &entry @@ -148,8 +150,9 @@ func sweepRegistrations() (waitDuration time.Duration, err error) { now := time.Now() if cli := config.GetConfig().SessionStorage.Client(); cli != nil { + ctx := context.Background() var vals map[string]string - vals, err = cli.HGetAll(redisRegistationKey).Result() + vals, err = cli.HGetAll(ctx, redisRegistationKey).Result() if err != nil { return } @@ -174,7 +177,7 @@ func sweepRegistrations() (waitDuration time.Duration, err error) { } if len(deletedKeys) > 0 { - err = cli.HDel(redisRegistationKey, deletedKeys...).Err() + err = cli.HDel(ctx, redisRegistationKey, deletedKeys...).Err() } } else { registrationsMapLock.Lock() diff --git a/model/sharing/upload_store.go b/model/sharing/upload_store.go index ae46c6bbddb..8235b064d75 100644 --- a/model/sharing/upload_store.go +++ b/model/sharing/upload_store.go @@ -1,6 +1,7 @@ package sharing import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) // A UploadStore is essentially an object to store files metadata by key @@ -38,7 +39,8 @@ func getStore() UploadStore { if cli == nil { globalStore = newMemStore() } else { - globalStore = &redisStore{cli} + ctx := context.Background() + globalStore = &redisStore{cli, ctx} } return globalStore } @@ -96,11 +98,12 @@ func (s *memStore) Save(db prefixer.Prefixer, doc *FileDocWithRevisions) (string } type redisStore struct { - c redis.UniversalClient + c redis.UniversalClient + ctx context.Context } func (s *redisStore) Get(db prefixer.Prefixer, key string) (*FileDocWithRevisions, error) { - b, err := s.c.Get(db.DBPrefix() + ":" + key).Bytes() + b, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Bytes() if err == redis.Nil { return nil, nil } @@ -120,7 +123,7 @@ func (s *redisStore) Save(db prefixer.Prefixer, doc *FileDocWithRevisions) (stri return "", err } key := makeSecret() - if err = s.c.Set(db.DBPrefix()+":"+key, v, uploadStoreTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, db.DBPrefix()+":"+key, v, uploadStoreTTL).Err(); err != nil { return "", err } return key, nil diff --git a/model/vfs/store.go b/model/vfs/store.go index 5f853521997..938355367cd 100644 --- a/model/vfs/store.go +++ b/model/vfs/store.go @@ -1,6 +1,7 @@ package vfs import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) // Store is essentially a place to store transient objects between two HTTP @@ -263,16 +264,18 @@ func (s *memStore) GetMetadata(db prefixer.Prefixer, key string) (*Metadata, err } type redisStore struct { - c redis.UniversalClient + c redis.UniversalClient + ctx context.Context } func newRedisStore(cli redis.UniversalClient) Store { - return &redisStore{cli} + ctx := context.Background() + return &redisStore{cli, ctx} } func (s *redisStore) AddFile(db prefixer.Prefixer, filePath string) (string, error) { key := makeSecret() - if err := s.c.Set(db.DBPrefix()+":"+key, filePath, storeTTL).Err(); err != nil { + if err := s.c.Set(s.ctx, db.DBPrefix()+":"+key, filePath, storeTTL).Err(); err != nil { return "", err } return key, nil @@ -280,7 +283,7 @@ func (s *redisStore) AddFile(db prefixer.Prefixer, filePath string) (string, err func (s *redisStore) AddThumb(db prefixer.Prefixer, fileID string) (string, error) { key := makeSecret() - if err := s.c.Set(db.DBPrefix()+":"+key, fileID, storeTTL).Err(); err != nil { + if err := s.c.Set(s.ctx, db.DBPrefix()+":"+key, fileID, storeTTL).Err(); err != nil { return "", err } return key, nil @@ -288,7 +291,7 @@ func (s *redisStore) AddThumb(db prefixer.Prefixer, fileID string) (string, erro func (s *redisStore) AddPreview(db prefixer.Prefixer, fileID string) (string, error) { key := makeSecret() - if err := s.c.Set(db.DBPrefix()+":"+key, fileID, storeTTL).Err(); err != nil { + if err := s.c.Set(s.ctx, db.DBPrefix()+":"+key, fileID, storeTTL).Err(); err != nil { return "", err } return key, nil @@ -296,7 +299,7 @@ func (s *redisStore) AddPreview(db prefixer.Prefixer, fileID string) (string, er func (s *redisStore) AddVersion(db prefixer.Prefixer, versionID string) (string, error) { key := makeSecret() - if err := s.c.Set(db.DBPrefix()+":"+key, versionID, storeTTL).Err(); err != nil { + if err := s.c.Set(s.ctx, db.DBPrefix()+":"+key, versionID, storeTTL).Err(); err != nil { return "", err } return key, nil @@ -308,7 +311,7 @@ func (s *redisStore) AddArchive(db prefixer.Prefixer, archive *Archive) (string, return "", err } key := makeSecret() - if err = s.c.Set(db.DBPrefix()+":"+key, v, storeTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, db.DBPrefix()+":"+key, v, storeTTL).Err(); err != nil { return "", err } return key, nil @@ -320,14 +323,14 @@ func (s *redisStore) AddMetadata(db prefixer.Prefixer, metadata *Metadata) (stri return "", err } key := makeSecret() - if err = s.c.Set(db.DBPrefix()+":"+key, v, storeTTL).Err(); err != nil { + if err = s.c.Set(s.ctx, db.DBPrefix()+":"+key, v, storeTTL).Err(); err != nil { return "", err } return key, nil } func (s *redisStore) GetFile(db prefixer.Prefixer, key string) (string, error) { - f, err := s.c.Get(db.DBPrefix() + ":" + key).Result() + f, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Result() if err == redis.Nil { return "", ErrWrongToken } @@ -338,7 +341,7 @@ func (s *redisStore) GetFile(db prefixer.Prefixer, key string) (string, error) { } func (s *redisStore) GetThumb(db prefixer.Prefixer, key string) (string, error) { - f, err := s.c.Get(db.DBPrefix() + ":" + key).Result() + f, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Result() if err == redis.Nil { return "", ErrWrongToken } @@ -349,7 +352,7 @@ func (s *redisStore) GetThumb(db prefixer.Prefixer, key string) (string, error) } func (s *redisStore) GetPreview(db prefixer.Prefixer, key string) (string, error) { - f, err := s.c.Get(db.DBPrefix() + ":" + key).Result() + f, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Result() if err == redis.Nil { return "", ErrWrongToken } @@ -360,7 +363,7 @@ func (s *redisStore) GetPreview(db prefixer.Prefixer, key string) (string, error } func (s *redisStore) GetVersion(db prefixer.Prefixer, key string) (string, error) { - f, err := s.c.Get(db.DBPrefix() + ":" + key).Result() + f, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Result() if err == redis.Nil { return "", ErrWrongToken } @@ -371,7 +374,7 @@ func (s *redisStore) GetVersion(db prefixer.Prefixer, key string) (string, error } func (s *redisStore) GetArchive(db prefixer.Prefixer, key string) (*Archive, error) { - b, err := s.c.Get(db.DBPrefix() + ":" + key).Bytes() + b, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Bytes() if err == redis.Nil { return nil, ErrWrongToken } @@ -386,7 +389,7 @@ func (s *redisStore) GetArchive(db prefixer.Prefixer, key string) (*Archive, err } func (s *redisStore) GetMetadata(db prefixer.Prefixer, key string) (*Metadata, error) { - b, err := s.c.Get(db.DBPrefix() + ":" + key).Bytes() + b, err := s.c.Get(s.ctx, db.DBPrefix()+":"+key).Bytes() if err == redis.Nil { return nil, ErrWrongToken } diff --git a/model/vfs/store_test.go b/model/vfs/store_test.go index d99634e12b4..1a081603b87 100644 --- a/model/vfs/store_test.go +++ b/model/vfs/store_test.go @@ -5,7 +5,7 @@ import ( "time" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 06c56990457..6f09542da2b 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -3,12 +3,13 @@ package cache import ( "bytes" "compress/gzip" + "context" "io" "strings" "sync" "time" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) type cacheEntry struct { @@ -22,15 +23,17 @@ type cacheEntry struct { type Cache struct { client redis.UniversalClient m *sync.Map + ctx context.Context } // New returns a new Cache from a potentially nil redis client. func New(client redis.UniversalClient) Cache { + ctx := context.Background() if client != nil { - return Cache{client, nil} + return Cache{client, nil, ctx} } m := sync.Map{} - return Cache{nil, &m} + return Cache{nil, &m, ctx} } // CheckStatus checks that the cache is ready, or returns an error. @@ -39,7 +42,7 @@ func (c Cache) CheckStatus() (time.Duration, error) { return 0, nil } before := time.Now() - if err := c.client.Ping().Err(); err != nil { + if err := c.client.Ping(c.ctx).Err(); err != nil { return 0, err } return time.Since(before), nil @@ -57,7 +60,7 @@ func (c Cache) Get(key string) ([]byte, bool) { c.Clear(key) } } else { - cmd := c.client.Get(key) + cmd := c.client.Get(c.ctx, key) if b, err := cmd.Bytes(); err == nil { return b, true } @@ -73,7 +76,7 @@ func (c Cache) MultiGet(keys []string) [][]byte { results[i], _ = c.Get(key) } } else { - cmd := c.client.MGet(keys...) + cmd := c.client.MGet(c.ctx, keys...) for i, val := range cmd.Val() { if buf, ok := val.(string); ok { results[i] = []byte(buf) @@ -87,7 +90,7 @@ func (c Cache) MultiGet(keys []string) [][]byte { // Note: it can be slow and should be used carefully. func (c Cache) Keys(prefix string) []string { if c.client != nil { - cmd := c.client.Keys(prefix + "*") + cmd := c.client.Keys(c.ctx, prefix+"*") return cmd.Val() } results := make([]string, 0) @@ -106,7 +109,7 @@ func (c Cache) Clear(key string) { if c.client == nil { c.m.Delete(key) } else { - c.client.Del(key) + c.client.Del(c.ctx, key) } } @@ -118,7 +121,7 @@ func (c Cache) Set(key string, data []byte, expiration time.Duration) { expiredAt: time.Now().Add(expiration), }) } else { - c.client.Set(key, data, expiration) + c.client.Set(c.ctx, key, data, expiration) } } @@ -130,7 +133,7 @@ func (c Cache) SetNX(key string, data []byte, expiration time.Duration) { expiredAt: time.Now().Add(expiration), }) } else { - c.client.SetNX(key, data, expiration) + c.client.SetNX(c.ctx, key, data, expiration) } } @@ -165,6 +168,6 @@ func (c Cache) RefreshTTL(key string, expiration time.Duration) { c.m.Store(key, entry) } } else { - c.client.Expire(key, expiration) + c.client.Expire(c.ctx, key, expiration) } } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 1ffbf01c1a0..2c82ac36daa 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) diff --git a/pkg/config/config/config.go b/pkg/config/config/config.go index 0c5a6747af0..b747916fc9a 100644 --- a/pkg/config/config/config.go +++ b/pkg/config/config/config.go @@ -2,6 +2,7 @@ package config import ( "bytes" + "context" "crypto/rand" "errors" "fmt" @@ -27,7 +28,7 @@ import ( "github.com/cozy/cozy-stack/pkg/tlsclient" "github.com/cozy/cozy-stack/pkg/utils" "github.com/cozy/gomail" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/spf13/viper" ) @@ -831,10 +832,19 @@ func UseViper(v *viper.Viper) error { } w := logger.WithNamespace("go-redis").Writer() - redis.SetLogger(stdlog.New(w, "", 0)) + l := stdlog.New(w, "", 0) + redis.SetLogger(&contextPrint{l}) return nil } +type contextPrint struct { + l *stdlog.Logger +} + +func (c contextPrint) Printf(ctx context.Context, format string, args ...interface{}) { + c.l.Printf(format, args...) +} + // MakeVault initializes the global vault. func MakeVault(c *Config) error { var credsEncryptor *keymgmt.NACLKey diff --git a/pkg/limits/rate_limiting.go b/pkg/limits/rate_limiting.go index 0bd2d10d86b..2c3cf60770e 100644 --- a/pkg/limits/rate_limiting.go +++ b/pkg/limits/rate_limiting.go @@ -1,6 +1,7 @@ package limits import ( + "context" "errors" "strconv" "sync" @@ -8,7 +9,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) // CounterType os an enum for the type of counters used by rate-limiting. @@ -285,12 +286,13 @@ func (c *memCounter) Reset(key string) error { type redisCounter struct { Client redis.UniversalClient + ctx context.Context } // NewRedisCounter returns a counter that can be mutualized between several // cozy-stack processes by using redis. func NewRedisCounter(client redis.UniversalClient) Counter { - return &redisCounter{client} + return &redisCounter{client, context.Background()} } // incrWithTTL is a lua script for redis to increment a counter and sets a TTL @@ -305,7 +307,7 @@ return n func (r *redisCounter) Increment(key string, timeLimit time.Duration) (int64, error) { ttl := strconv.FormatInt(int64(timeLimit/time.Second), 10) - count, err := r.Client.Eval(incrWithTTL, []string{key, ttl}).Result() + count, err := r.Client.Eval(r.ctx, incrWithTTL, []string{key, ttl}).Result() if err != nil { return 0, err } @@ -313,7 +315,7 @@ func (r *redisCounter) Increment(key string, timeLimit time.Duration) (int64, er } func (r *redisCounter) Reset(key string) error { - _, err := r.Client.Del(key).Result() + _, err := r.Client.Del(r.ctx, key).Result() return err } diff --git a/pkg/limits/rate_limiting_test.go b/pkg/limits/rate_limiting_test.go index d4ae35b16c2..8c0bfee0cb3 100644 --- a/pkg/limits/rate_limiting_test.go +++ b/pkg/limits/rate_limiting_test.go @@ -1,10 +1,11 @@ package limits import ( + "context" "testing" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) @@ -29,7 +30,7 @@ func TestLoginRateExceededMem(t *testing.T) { func TestLoginRateNotExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - client.Del("auth:" + testInstance.DomainName()) + client.Del(context.Background(), "auth:"+testInstance.DomainName()) globalCounter = NewRedisCounter(client) assert.NoError(t, CheckRateLimit(testInstance, AuthType)) } @@ -38,7 +39,7 @@ func TestLoginRateExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) globalCounter = NewRedisCounter(client) - client.Del("auth:" + testInstance.DomainName()) + client.Del(context.Background(), "auth:"+testInstance.DomainName()) for i := 1; i <= 1000; i++ { assert.NoError(t, CheckRateLimit(testInstance, AuthType)) } @@ -62,7 +63,7 @@ func Test2FAGenerationExceededMem(t *testing.T) { func Test2FAGenerationNotExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - client.Del("two-factor-generation:" + testInstance.DomainName()) + client.Del(context.Background(), "two-factor-generation:"+testInstance.DomainName()) globalCounter = NewRedisCounter(client) assert.NoError(t, CheckRateLimit(testInstance, TwoFactorGenerationType)) } @@ -71,7 +72,7 @@ func Test2FAGenerationExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) globalCounter = NewRedisCounter(client) - client.Del("two-factor-generation:" + testInstance.DomainName()) + client.Del(context.Background(), "two-factor-generation:"+testInstance.DomainName()) for i := 1; i <= 20; i++ { assert.NoError(t, CheckRateLimit(testInstance, TwoFactorGenerationType)) } @@ -95,7 +96,7 @@ func Test2FARateExceededMem(t *testing.T) { func Test2FANotExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) - client.Del("two-factor:" + testInstance.DomainName()) + client.Del(context.Background(), "two-factor:"+testInstance.DomainName()) globalCounter = NewRedisCounter(client) assert.NoError(t, CheckRateLimit(testInstance, TwoFactorType)) } @@ -104,7 +105,7 @@ func Test2FAExceededRedis(t *testing.T) { opts, _ := redis.ParseURL(redisURL) client := redis.NewClient(opts) globalCounter = NewRedisCounter(client) - client.Del("two-factor:" + testInstance.DomainName()) + client.Del(context.Background(), "two-factor:"+testInstance.DomainName()) for i := 1; i <= 10; i++ { assert.NoError(t, CheckRateLimit(testInstance, TwoFactorType)) } diff --git a/pkg/lock/simple_redis.go b/pkg/lock/simple_redis.go index 19f42ec689b..b90b5264a95 100644 --- a/pkg/lock/simple_redis.go +++ b/pkg/lock/simple_redis.go @@ -1,6 +1,7 @@ package lock import ( + "context" "errors" "math/rand" "strconv" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/logger" "github.com/cozy/cozy-stack/pkg/utils" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" ) @@ -17,8 +18,8 @@ const luaRefresh = `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.c const luaRelease = `if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end` type subRedisInterface interface { - SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd - Eval(script string, keys []string, args ...interface{}) *redis.Cmd + SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.BoolCmd + Eval(ctx context.Context, script string, keys []string, args ...interface{}) *redis.Cmd } const ( @@ -48,6 +49,7 @@ var ( type redisLock struct { client subRedisInterface + ctx context.Context mu sync.Mutex key string token string @@ -65,7 +67,7 @@ func (rl *redisLock) extends() (bool, error) { // we already have a lock, attempts to extends it ttl := strconv.FormatInt(int64(LockTimeout/time.Millisecond), 10) - ok, err := rl.client.Eval(luaRefresh, []string{rl.key}, rl.token, ttl).Result() + ok, err := rl.client.Eval(rl.ctx, luaRefresh, []string{rl.key}, rl.token, ttl).Result() if err != nil { return false, err // most probably redis connectivity error } @@ -80,7 +82,7 @@ func (rl *redisLock) extends() (bool, error) { func (rl *redisLock) obtains(writing bool, token string) (bool, error) { // Try to obtain a lock - ok, err := rl.client.SetNX(rl.key, token, LockTimeout).Result() + ok, err := rl.client.SetNX(rl.ctx, rl.key, token, LockTimeout).Result() if err != nil { return false, err // most probably redis connectivity error } @@ -165,7 +167,7 @@ func (rl *redisLock) unlock(writing bool) { return } - _, err := rl.client.Eval(luaRelease, []string{rl.key}, rl.token).Result() + _, err := rl.client.Eval(rl.ctx, luaRelease, []string{rl.key}, rl.token).Result() if err != nil { rl.log.Warnf("Failed to unlock: %s", err.Error()) } @@ -194,6 +196,7 @@ var redislocksMu sync.Mutex func makeRedisSimpleLock(c subRedisInterface, ns string) *redisLock { return &redisLock{ client: c, + ctx: context.Background(), key: basicLockNS + ns, log: logger.WithDomain(ns).WithField("nspace", "redis-lock"), rng: rand.New(rand.NewSource(time.Now().UnixNano())), diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index a307e8bc75f..ac202b830bc 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -1,13 +1,14 @@ package logger import ( + "context" "io/ioutil" "strings" "sync" "time" build "github.com/cozy/cozy-stack/pkg/config" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" ) @@ -63,8 +64,9 @@ func Init(opt Options) error { formatter.TimestampFormat = time.RFC3339Nano } if cli := opt.Redis; cli != nil { - go subscribeLoggersDebug(cli) - go loadDebug(cli) + ctx := context.Background() + go subscribeLoggersDebug(ctx, cli) + go loadDebug(ctx, cli) } opts = opt return nil @@ -87,7 +89,8 @@ func Clone(in *logrus.Logger) *logrus.Logger { // AddDebugDomain adds the specified domain to the debug list. func AddDebugDomain(domain string, ttl time.Duration) error { if cli := opts.Redis; cli != nil { - return publishDebug(cli, debugRedisAddChannel, domain, ttl) + ctx := context.Background() + return publishDebug(ctx, cli, debugRedisAddChannel, domain, ttl) } addDebugDomain(domain, ttl) return nil @@ -96,7 +99,8 @@ func AddDebugDomain(domain string, ttl time.Duration) error { // RemoveDebugDomain removes the specified domain from the debug list. func RemoveDebugDomain(domain string) error { if cli := opts.Redis; cli != nil { - return publishDebug(cli, debugRedisRmvChannel, domain, 0) + ctx := context.Background() + return publishDebug(ctx, cli, debugRedisRmvChannel, domain, 0) } removeDebugDomain(domain) return nil @@ -147,8 +151,8 @@ func removeDebugDomain(domain string) { delete(loggers, domain) } -func subscribeLoggersDebug(cli redis.UniversalClient) { - sub := cli.Subscribe(debugRedisAddChannel, debugRedisRmvChannel) +func subscribeLoggersDebug(ctx context.Context, cli redis.UniversalClient) { + sub := cli.Subscribe(ctx, debugRedisAddChannel, debugRedisRmvChannel) for msg := range sub.Channel() { parts := strings.Split(msg.Payload, "/") domain := parts[0] @@ -165,13 +169,13 @@ func subscribeLoggersDebug(cli redis.UniversalClient) { } } -func loadDebug(cli redis.UniversalClient) { - keys, err := cli.Keys(debugRedisPrefix + "*").Result() +func loadDebug(ctx context.Context, cli redis.UniversalClient) { + keys, err := cli.Keys(ctx, debugRedisPrefix+"*").Result() if err != nil { return } for _, key := range keys { - ttl, err := cli.TTL(key).Result() + ttl, err := cli.TTL(ctx, key).Result() if err != nil { continue } @@ -180,16 +184,16 @@ func loadDebug(cli redis.UniversalClient) { } } -func publishDebug(cli redis.UniversalClient, channel, domain string, ttl time.Duration) error { - err := cli.Publish(channel, domain+"/"+ttl.String()).Err() +func publishDebug(ctx context.Context, cli redis.UniversalClient, channel, domain string, ttl time.Duration) error { + err := cli.Publish(ctx, channel, domain+"/"+ttl.String()).Err() if err != nil { return err } key := debugRedisPrefix + domain if channel == debugRedisAddChannel { - err = cli.Set(key, 0, ttl).Err() + err = cli.Set(ctx, key, 0, ttl).Err() } else { - err = cli.Del(key).Err() + err = cli.Del(ctx, key).Err() } return err } diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index 6937e47972e..b876e33dc4a 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) diff --git a/pkg/realtime/realtime_test.go b/pkg/realtime/realtime_test.go index 416d2fa075e..038f0e5f08b 100644 --- a/pkg/realtime/realtime_test.go +++ b/pkg/realtime/realtime_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" "github.com/stretchr/testify/assert" ) diff --git a/pkg/realtime/redis_hub.go b/pkg/realtime/redis_hub.go index 2ca68675ee3..60dac917a68 100644 --- a/pkg/realtime/redis_hub.go +++ b/pkg/realtime/redis_hub.go @@ -1,26 +1,29 @@ package realtime import ( + "context" "encoding/json" "strings" "github.com/cozy/cozy-stack/pkg/logger" "github.com/cozy/cozy-stack/pkg/prefixer" - redis "github.com/go-redis/redis/v7" + redis "github.com/go-redis/redis/v8" ) const eventsRedisKey = "realtime:events" type redisHub struct { c redis.UniversalClient + ctx context.Context mem *memHub local *topic } func newRedisHub(c redis.UniversalClient) *redisHub { + ctx := context.Background() local := newTopic("*") mem := newMemHub() - hub := &redisHub{c, mem, local} + hub := &redisHub{c, ctx, mem, local} go hub.start() return hub } @@ -90,7 +93,7 @@ func (j *jsonEvent) UnmarshalJSON(buf []byte) error { } func (h *redisHub) start() { - sub := h.c.Subscribe(eventsRedisKey) + sub := h.c.Subscribe(h.ctx, eventsRedisKey) log := logger.WithNamespace("realtime-redis") for msg := range sub.Channel() { je := jsonEvent{} @@ -129,7 +132,7 @@ func (h *redisHub) Publish(db prefixer.Prefixer, verb string, doc, oldDoc Doc) { log.Warnf("Error on publish: %s", err) return } - h.c.Publish(eventsRedisKey, e.Doc.DocType()+","+string(buf)) + h.c.Publish(h.ctx, eventsRedisKey, e.Doc.DocType()+","+string(buf)) } func (h *redisHub) Subscriber(db prefixer.Prefixer) *DynamicSubscriber { diff --git a/web/accounts/statestore.go b/web/accounts/statestore.go index 2268239f30c..942778522e1 100644 --- a/web/accounts/statestore.go +++ b/web/accounts/statestore.go @@ -1,6 +1,7 @@ package accounts import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/logger" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) const stateTTL = 15 * time.Minute @@ -51,12 +52,13 @@ func (store memStateStorage) Find(ref string) *stateHolder { } type subRedisInterface interface { - Get(key string) *redis.StringCmd - Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd + Get(ctx context.Context, key string) *redis.StringCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd } type redisStateStorage struct { - cl subRedisInterface + cl subRedisInterface + ctx context.Context } func (store *redisStateStorage) Add(s *stateHolder) (string, error) { @@ -65,11 +67,11 @@ func (store *redisStateStorage) Add(s *stateHolder) (string, error) { if err != nil { return "", err } - return ref, store.cl.Set(ref, bb, stateTTL).Err() + return ref, store.cl.Set(store.ctx, ref, bb, stateTTL).Err() } func (store *redisStateStorage) Find(ref string) *stateHolder { - bb, err := store.cl.Get(ref).Bytes() + bb, err := store.cl.Get(store.ctx, ref).Bytes() if err != nil { return nil } @@ -96,7 +98,8 @@ func getStorage() stateStorage { if cli == nil { globalStorage = &memStateStorage{} } else { - globalStorage = &redisStateStorage{cl: cli} + ctx := context.Background() + globalStorage = &redisStateStorage{cl: cli, ctx: ctx} } return globalStorage } diff --git a/web/auth/store.go b/web/auth/store.go index c5417c65e27..79e148a34a5 100644 --- a/web/auth/store.go +++ b/web/auth/store.go @@ -1,6 +1,7 @@ package auth import ( + "context" "encoding/hex" "sync" "time" @@ -8,7 +9,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/prefixer" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) // Store is essentially an object to store and retrieve confirmation codes @@ -37,7 +38,8 @@ func GetStore() Store { if cli == nil { globalStore = newMemStore() } else { - globalStore = &redisStore{cli} + ctx := context.Background() + globalStore = &redisStore{cli, ctx} } return globalStore } @@ -89,13 +91,14 @@ func (s *memStore) GetCode(db prefixer.Prefixer, code string) (bool, error) { } type redisStore struct { - c redis.UniversalClient + c redis.UniversalClient + ctx context.Context } func (s *redisStore) AddCode(db prefixer.Prefixer) (string, error) { code := makeSecret() key := confirmKey(db, code) - if err := s.c.Set(key, "1", storeTTL).Err(); err != nil { + if err := s.c.Set(s.ctx, key, "1", storeTTL).Err(); err != nil { return "", err } return code, nil @@ -103,7 +106,7 @@ func (s *redisStore) AddCode(db prefixer.Prefixer) (string, error) { func (s *redisStore) GetCode(db prefixer.Prefixer, code string) (bool, error) { key := confirmKey(db, code) - n, err := s.c.Exists(key).Result() + n, err := s.c.Exists(s.ctx, key).Result() if err == redis.Nil || n == 0 { return false, nil } diff --git a/web/oidc/statestore.go b/web/oidc/statestore.go index bfa3fa02aea..c90bbe9e859 100644 --- a/web/oidc/statestore.go +++ b/web/oidc/statestore.go @@ -1,6 +1,7 @@ package oidc import ( + "context" "encoding/hex" "encoding/json" "sync" @@ -9,7 +10,7 @@ import ( "github.com/cozy/cozy-stack/pkg/config/config" "github.com/cozy/cozy-stack/pkg/crypto" "github.com/cozy/cozy-stack/pkg/logger" - "github.com/go-redis/redis/v7" + "github.com/go-redis/redis/v8" ) const stateTTL = 15 * time.Minute @@ -61,12 +62,13 @@ func (store memStateStorage) Find(id string) *stateHolder { } type subRedisInterface interface { - Get(key string) *redis.StringCmd - Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd + Get(ctx context.Context, key string) *redis.StringCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *redis.StatusCmd } type redisStateStorage struct { - cl subRedisInterface + cl subRedisInterface + ctx context.Context } func (store *redisStateStorage) Add(s *stateHolder) error { @@ -74,11 +76,11 @@ func (store *redisStateStorage) Add(s *stateHolder) error { if err != nil { return err } - return store.cl.Set(s.id, serialized, stateTTL).Err() + return store.cl.Set(store.ctx, s.id, serialized, stateTTL).Err() } func (store *redisStateStorage) Find(id string) *stateHolder { - serialized, err := store.cl.Get(id).Bytes() + serialized, err := store.cl.Get(store.ctx, id).Bytes() if err != nil { return nil } @@ -105,7 +107,8 @@ func getStorage() stateStorage { if cli == nil { globalStorage = &memStateStorage{} } else { - globalStorage = &redisStateStorage{cl: cli} + ctx := context.Background() + globalStorage = &redisStateStorage{cl: cli, ctx: ctx} } return globalStorage }