Skip to content

Commit

Permalink
Merge pull request go-redsync#80 from KeiichiHirobe/set-timeout-for-lock
Browse files Browse the repository at this point in the history
set timeout for request to Redis when trying to acquire a lock
  • Loading branch information
hjr265 committed Dec 20, 2021
2 parents a744966 + 62c8209 commit 52511c8
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 22 deletions.
27 changes: 18 additions & 9 deletions mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ type Mutex struct {
tries int
delayFunc DelayFunc

factor float64
driftFactor float64
timeoutFactor float64

quorum int

Expand Down Expand Up @@ -76,23 +77,31 @@ func (m *Mutex) LockContext(ctx context.Context) error {

start := time.Now()

n, err := m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
n, err := func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.acquire(ctx, pool, value)
})
}()
if n == 0 && err != nil {
return err
}

now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.factor)))
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if n >= m.quorum && now.Before(until) {
m.value = value
m.until = until
return nil
}
_, err = m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
_, err = func() (int, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(int64(float64(m.expiry)*m.timeoutFactor)))
defer cancel()
return m.actOnPoolsAsync(func(pool redis.Pool) (bool, error) {
return m.release(ctx, pool, value)
})
}()
if i == m.tries-1 && err != nil {
return err
}
Expand Down Expand Up @@ -132,7 +141,7 @@ func (m *Mutex) ExtendContext(ctx context.Context) (bool, error) {
return false, err
}
now := time.Now()
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.factor)))
until := now.Add(m.expiry - now.Sub(start) - time.Duration(int64(float64(m.expiry)*m.driftFactor)))
if now.Before(until) {
m.until = until
return true, nil
Expand Down
17 changes: 9 additions & 8 deletions mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,15 @@ func newTestMutexes(pools []redis.Pool, name string, n int) []*Mutex {
mutexes := make([]*Mutex, n)
for i := 0; i < n; i++ {
mutexes[i] = &Mutex{
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration { return 500 * time.Millisecond },
genValueFunc: genValue,
factor: 0.01,
quorum: len(pools)/2 + 1,
pools: pools,
name: name,
expiry: 8 * time.Second,
tries: 32,
delayFunc: func(tries int) time.Duration { return 500 * time.Millisecond },
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(pools)/2 + 1,
pools: pools,
}
}
return mutexes
Expand Down
18 changes: 13 additions & 5 deletions redsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@ func (r *Redsync) NewMutex(name string, options ...Option) *Mutex {
delayFunc: func(tries int) time.Duration {
return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond
},
genValueFunc: genValue,
factor: 0.01,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
genValueFunc: genValue,
driftFactor: 0.01,
timeoutFactor: 0.05,
quorum: len(r.pools)/2 + 1,
pools: r.pools,
}
for _, o := range options {
o.Apply(m)
Expand Down Expand Up @@ -90,7 +91,14 @@ func WithRetryDelayFunc(delayFunc DelayFunc) Option {
// WithDriftFactor can be used to set the clock drift factor.
func WithDriftFactor(factor float64) Option {
return OptionFunc(func(m *Mutex) {
m.factor = factor
m.driftFactor = factor
})
}

// WithTimeoutFactor can be used to set the timeout factor.
func WithTimeoutFactor(factor float64) Option {
return OptionFunc(func(m *Mutex) {
m.timeoutFactor = factor
})
}

Expand Down

0 comments on commit 52511c8

Please sign in to comment.