Skip to content

Commit

Permalink
Update module github.com/go-redis/redis/v7 to v8
Browse files Browse the repository at this point in the history
  • Loading branch information
renovate-bot authored and nono committed Oct 11, 2021
1 parent 761acae commit 6c707b6
Show file tree
Hide file tree
Showing 25 changed files with 228 additions and 175 deletions.
4 changes: 1 addition & 3 deletions go.mod
Expand Up @@ -14,7 +14,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63
github.com/dustin/go-humanize v1.0.0
github.com/go-redis/redis/v7 v7.4.1
github.com/go-redis/redis/v8 v8.11.3
github.com/gofrs/uuid v4.0.0+incompatible
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
github.com/golang/gddo v0.0.0-20210115222349-20d68f94ee1f
Expand All @@ -33,8 +33,6 @@ require (
github.com/mssola/user_agent v0.5.3
github.com/ncw/swift v1.0.53
github.com/nightlyone/lockfile v1.0.0
github.com/onsi/ginkgo v1.15.0 // indirect
github.com/onsi/gomega v1.10.5 // indirect
github.com/oschwald/maxminddb-golang v1.8.0
github.com/pquerna/otp v1.3.0
github.com/prometheus/client_golang v1.11.0
Expand Down
23 changes: 12 additions & 11 deletions go.sum
Expand Up @@ -107,6 +107,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63 h1:/u5RVRk3Nh7Zw1QQnPtUH5kzcc8JmSSRpHSlGU/zGTE=
github.com/dhowden/tag v0.0.0-20201120070457-d52dcb253c63/go.mod h1:SniNVYuaD1jmdEEvi+7ywb1QFR7agjeTdGKyFb0p7Rw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down Expand Up @@ -139,10 +141,11 @@ github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.2.6-0.20210915003542-8b1f7f90f6b1/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.3 h1:GCjoYp8c+yQTJfc0n69iwSiHjvuAdruxl7elnZCxgt8=
github.com/go-redis/redis/v8 v8.11.3/go.mod h1:xNJ9xDG09FsIPwh3bWdk+0oDWHbtF9rPN0F/oD9XeKc=
github.com/go-stack/stack v1.6.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down Expand Up @@ -362,18 +365,17 @@ github.com/ncw/swift v1.0.53 h1:luHjjTNtekIEvHg5KdAFIBaH7bWfNkefwFnpDffSIks=
github.com/ncw/swift v1.0.53/go.mod h1:23YIA4yWVnGwv2dQlN4bB7egfYX6YLn0Yo/S6zZO/ZM=
github.com/nightlyone/lockfile v1.0.0 h1:RHep2cFKK4PonZJDdEl4GmkabuhbsRMgk/k3uAmxBiA=
github.com/nightlyone/lockfile v1.0.0/go.mod h1:rywoIealpdNse2r832aiD9jRk8ErCatROs6LzC841CI=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/onsi/gomega v1.15.0 h1:WjP/FQ/sk43MRmnEcT+MlDw2TFvkrXlprrPST/IudjU=
github.com/onsi/gomega v1.15.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk=
github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -573,12 +575,12 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210913180222-943fd674d43e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211008194852-3b03d305991f h1:1scJEYZBaF48BaG6tYbtxmLcXqwYGSfGcMoStTqkkIw=
Expand Down Expand Up @@ -639,7 +641,6 @@ golang.org/x/sys v0.0.0-20190924154521-2837fb4f24fe/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
12 changes: 7 additions & 5 deletions model/job/redis_broker.go
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/cozy/cozy-stack/pkg/config/config"
"github.com/cozy/cozy-stack/pkg/limits"
"github.com/cozy/cozy-stack/pkg/prefixer"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
multierror "github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
)
Expand All @@ -25,6 +25,7 @@ const (

type redisBroker struct {
client redis.UniversalClient
ctx context.Context
workers []*Worker
workersRunning []*Worker
workersTypes []string
Expand All @@ -37,6 +38,7 @@ type redisBroker struct {
func NewRedisBroker(client redis.UniversalClient) Broker {
return &redisBroker{
client: client,
ctx: context.Background(),
closed: make(chan struct{}),
}
}
Expand Down Expand Up @@ -150,7 +152,7 @@ func (b *redisBroker) pollLoop(key string, ch chan<- *Job) {
if rng.Intn(3) == 0 {
keyP1, keyP0 = keyP0, keyP1
}
results, err := b.client.BRPop(redisBRPopTimeout, keyP0, keyP1).Result()
results, err := b.client.BRPop(b.ctx, redisBRPopTimeout, keyP0, keyP1).Result()
if err != nil || len(results) < 2 {
time.Sleep(100 * time.Millisecond)
continue
Expand Down Expand Up @@ -243,7 +245,7 @@ func (b *redisBroker) PushJob(db prefixer.Prefixer, req *JobRequest) (*Job, erro
key += redisHighPrioritySuffix
}

if err := b.client.LPush(key, val).Err(); err != nil {
if err := b.client.LPush(b.ctx, key, val).Err(); err != nil {
return nil, err
}

Expand All @@ -254,11 +256,11 @@ func (b *redisBroker) PushJob(db prefixer.Prefixer, req *JobRequest) (*Job, erro
// specified worker type.
func (b *redisBroker) WorkerQueueLen(workerType string) (int, error) {
key := redisPrefix + workerType
l1, err := b.client.LLen(key).Result()
l1, err := b.client.LLen(b.ctx, key).Result()
if err != nil {
return 0, err
}
l2, err := b.client.LLen(key + redisHighPrioritySuffix).Result()
l2, err := b.client.LLen(b.ctx, key+redisHighPrioritySuffix).Result()
if err != nil {
return 0, err
}
Expand Down
2 changes: 1 addition & 1 deletion model/job/redis_broker_test.go
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/cozy/cozy-stack/model/job"
jobs "github.com/cozy/cozy-stack/model/job"
"github.com/cozy/cozy-stack/pkg/limits"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/stretchr/testify/assert"
)

Expand Down
68 changes: 35 additions & 33 deletions model/job/redis_scheduler.go
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/cozy/cozy-stack/pkg/logger"
"github.com/cozy/cozy-stack/pkg/prefixer"
"github.com/cozy/cozy-stack/pkg/realtime"
"github.com/go-redis/redis/v7"
"github.com/go-redis/redis/v8"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -57,6 +57,7 @@ return t`
type redisScheduler struct {
broker Broker
client redis.UniversalClient
ctx context.Context
closed chan struct{}
stopped chan struct{}
log *logrus.Entry
Expand All @@ -67,6 +68,7 @@ type redisScheduler struct {
func NewRedisScheduler(client redis.UniversalClient) Scheduler {
return &redisScheduler{
client: client,
ctx: context.Background(),
log: logger.WithNamespace("scheduler-redis"),
stopped: make(chan struct{}),
}
Expand Down Expand Up @@ -136,7 +138,7 @@ func (s *redisScheduler) startEventDispatcher() {
func (s *redisScheduler) eventLoop(eventsCh <-chan *realtime.Event) {
for event := range eventsCh {
key := eventsKey(event)
m, err := s.client.HGetAll(key).Result()
m, err := s.client.HGetAll(s.ctx, key).Result()
if err != nil {
s.log.Errorf("Could not fetch redis set %s: %s",
key, err.Error())
Expand Down Expand Up @@ -170,7 +172,7 @@ func (s *redisScheduler) eventLoop(eventsCh <-chan *realtime.Event) {
var d time.Duration
if d, err = time.ParseDuration(et.Infos().Debounce); err == nil {
timestamp := time.Now().Add(d)
s.client.ZAddNX(TriggersKey, &redis.Z{
s.client.ZAddNX(s.ctx, TriggersKey, &redis.Z{
Score: float64(timestamp.UTC().Unix()),
Member: redisKey(t),
})
Expand Down Expand Up @@ -217,15 +219,15 @@ func (s *redisScheduler) fire(trigger Trigger, request *JobRequest) {
pipe := s.client.Pipeline()
switch trigger.CombineRequest() {
case appendPayload:
pipe.RPush(payloadKey(trigger), string(request.Payload))
pipe.RPush(s.ctx, payloadKey(trigger), string(request.Payload))
case keepOriginalRequest:
pipe.SetNX(payloadKey(trigger), string(request.Payload), 30*24*time.Hour)
pipe.SetNX(s.ctx, payloadKey(trigger), string(request.Payload), 30*24*time.Hour)
}
pipe.ZAddNX(TriggersKey, &redis.Z{
pipe.ZAddNX(s.ctx, TriggersKey, &redis.Z{
Score: float64(timestamp.UTC().Unix()),
Member: redisKey(trigger),
})
if _, err := pipe.Exec(); err != nil {
if _, err := pipe.Exec(s.ctx); err != nil {
s.log.Warnf("Cannot fire trigger because of redis error: %s", err)
}
}
Expand All @@ -251,7 +253,7 @@ func (s *redisScheduler) ShutdownScheduler(ctx context.Context) error {
func (s *redisScheduler) PollScheduler(now int64) error {
keys := []string{strconv.FormatInt(now, 10)}
for {
res, err := s.client.Eval(luaPoll, keys).Result()
res, err := s.client.Eval(s.ctx, luaPoll, keys).Result()
if err != nil || res == nil {
return err
}
Expand All @@ -264,39 +266,39 @@ func (s *redisScheduler) PollScheduler(now int64) error {
}
parts := strings.SplitN(results[0].(string), "/", 2)
if len(parts) != 2 {
s.client.ZRem(SchedKey, results[0])
s.client.ZRem(s.ctx, SchedKey, results[0])
return fmt.Errorf("Invalid key %s", res)
}

prefix := parts[0]
t, err := s.GetTrigger(prefixer.NewPrefixer("", prefix), parts[1])
if err != nil {
if err == ErrNotFoundTrigger || err == ErrMalformedTrigger {
s.client.ZRem(SchedKey, results[0])
s.client.ZRem(s.ctx, SchedKey, results[0])
}
return err
}
switch t := t.(type) {
case *EventTrigger, *WebhookTrigger: // Debounced
job := t.Infos().JobRequest()
job.Debounced = true
if err = s.client.ZRem(SchedKey, results[0]).Err(); err != nil {
if err = s.client.ZRem(s.ctx, SchedKey, results[0]).Err(); err != nil {
return err
}
switch t.CombineRequest() {
case appendPayload:
pipe := s.client.Pipeline()
lrange := pipe.LRange(payloadKey(t), 0, -1)
pipe.Del(payloadKey(t))
if _, err := pipe.Exec(); err == nil {
lrange := pipe.LRange(s.ctx, payloadKey(t), 0, -1)
pipe.Del(s.ctx, payloadKey(t))
if _, err := pipe.Exec(s.ctx); err == nil {
payloads := strings.Join(lrange.Val(), ",")
job.Payload = Payload(`{"payloads":[` + payloads + "]}")
}
case keepOriginalRequest:
pipe := s.client.Pipeline()
get := pipe.Get(payloadKey(t))
pipe.Del(payloadKey(t))
if _, err := pipe.Exec(); err == nil {
get := pipe.Get(s.ctx, payloadKey(t))
pipe.Del(s.ctx, payloadKey(t))
if _, err := pipe.Exec(s.ctx); err == nil {
job.Payload = Payload(get.Val())
}
}
Expand All @@ -307,7 +309,7 @@ func (s *redisScheduler) PollScheduler(now int64) error {
job := t.Infos().JobRequest()
if _, err = s.broker.PushJob(t, job); err != nil {
if limits.IsLimitReachedOrExceeded(err) {
s.client.ZRem(SchedKey, results[0])
s.client.ZRem(s.ctx, SchedKey, results[0])
}
return err
}
Expand All @@ -320,7 +322,7 @@ func (s *redisScheduler) PollScheduler(now int64) error {
// Remove the cron trigger from redis if it is invalid, as it
// may block other cron triggers
if err == ErrUnknownWorker || limits.IsLimitReachedOrExceeded(err) {
s.client.ZRem(SchedKey, results[0])
s.client.ZRem(s.ctx, SchedKey, results[0])
continue
}
return err
Expand Down Expand Up @@ -355,7 +357,7 @@ func (s *redisScheduler) addToRedis(t Trigger, prev time.Time) error {
switch t := t.(type) {
case *EventTrigger:
hKey := eventsKey(t)
return s.client.HSet(hKey, t.ID(), t.Infos().Arguments).Err()
return s.client.HSet(s.ctx, hKey, t.ID(), t.Infos().Arguments).Err()
case *AtTrigger:
timestamp = t.at
case *CronTrigger:
Expand All @@ -370,18 +372,18 @@ func (s *redisScheduler) addToRedis(t Trigger, prev time.Time) error {
return errors.New("Not implemented yet")
}
pipe := s.client.Pipeline()
err := pipe.ZAdd(TriggersKey, &redis.Z{
err := pipe.ZAdd(s.ctx, TriggersKey, &redis.Z{
Score: float64(timestamp.UTC().Unix()),
Member: redisKey(t),
}).Err()
if err != nil {
return err
}
err = pipe.ZRem(SchedKey, redisKey(t)).Err()
err = pipe.ZRem(s.ctx, SchedKey, redisKey(t)).Err()
if err != nil {
return err
}
_, err = pipe.Exec()
_, err = pipe.Exec(s.ctx)
return err
}

Expand Down Expand Up @@ -420,13 +422,13 @@ func (s *redisScheduler) UpdateCron(db prefixer.Prefixer, trigger Trigger, argum
}
timestamp := updated.NextExecution(time.Now())
pipe := s.client.Pipeline()
pipe.ZRem(TriggersKey, redisKey(updated))
pipe.ZRem(SchedKey, redisKey(updated))
pipe.ZAdd(TriggersKey, &redis.Z{
pipe.ZRem(s.ctx, TriggersKey, redisKey(updated))
pipe.ZRem(s.ctx, SchedKey, redisKey(updated))
pipe.ZAdd(s.ctx, TriggersKey, &redis.Z{
Score: float64(timestamp.UTC().Unix()),
Member: redisKey(updated),
})
_, err = pipe.Exec()
_, err = pipe.Exec(s.ctx)
return err
}

Expand All @@ -446,12 +448,12 @@ func (s *redisScheduler) deleteTrigger(t Trigger) error {
}
switch t.(type) {
case *EventTrigger:
return s.client.HDel(eventsKey(t), t.ID()).Err()
return s.client.HDel(s.ctx, eventsKey(t), t.ID()).Err()
case *AtTrigger, *CronTrigger:
pipe := s.client.Pipeline()
pipe.ZRem(TriggersKey, redisKey(t))
pipe.ZRem(SchedKey, redisKey(t))
_, err := pipe.Exec()
pipe.ZRem(s.ctx, TriggersKey, redisKey(t))
pipe.ZRem(s.ctx, SchedKey, redisKey(t))
_, err := pipe.Exec(s.ctx)
return err
}
return nil
Expand Down Expand Up @@ -489,7 +491,7 @@ func (s *redisScheduler) GetAllTriggers(db prefixer.Prefixer) ([]Trigger, error)
func (s *redisScheduler) HasEventTrigger(trigger Trigger) bool {
infos := trigger.Infos()
key := eventsKey(trigger)
m, err := s.client.HGetAll(key).Result()
m, err := s.client.HGetAll(s.ctx, key).Result()
if err != nil {
s.log.Errorf("Could not fetch redis set %s: %s", key, err)
return false
Expand All @@ -514,7 +516,7 @@ func (s *redisScheduler) HasEventTrigger(trigger Trigger) bool {
// CleanRedis removes clean redis by removing the two sets holding the triggers
// states.
func (s *redisScheduler) CleanRedis() error {
return s.client.Del(TriggersKey, SchedKey).Err()
return s.client.Del(s.ctx, TriggersKey, SchedKey).Err()
}

// RebuildRedis puts all the triggers in redis (idempotent)
Expand Down

0 comments on commit 6c707b6

Please sign in to comment.