/
PreventOverlappingJobs.php
138 lines (120 loc) · 2.68 KB
/
PreventOverlappingJobs.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
<?php
namespace Illuminate\Queue\Middleware;
use Illuminate\Container\Container;
use Illuminate\Contracts\Cache\Repository as Cache;
class PreventOverlappingJobs
{
/**
* The amount of time (in seconds) to expire the lock.
*
* @var int
*/
public $expiresAt;
/**
* The key of the job.
*
* @var string
*/
public $key;
/**
* The prefix of the lock key.
*
* @var string
*/
public $prefix = 'overlap:';
/**
* The delay (in seconds) to release the job back to the queue.
*
* @var int|null
*/
public $releaseAfter;
/**
* Create a new overlapping jobs middleware instance.
*
* @param string $key
* @param int|null $releaseAfter
* @param int $expiresAt
*
* @return void
*/
public function __construct($key = '', $releaseAfter = 0, $expiresAt = 0)
{
$this->key = $key;
$this->releaseAfter = $releaseAfter;
$this->expiresAt = $expiresAt;
}
/**
* Process the job.
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
$lock = Container::getInstance()->make(Cache::class)
->lock($this->getLockKey($job), $this->expiresAt);
if ($lock->get()) {
try {
$next($job);
} finally {
$lock->release();
}
} elseif (! is_null($this->releaseAfter)) {
$job->release($this->releaseAfter);
}
}
/**
* Do not release the job back to the queue.
*
* @return $this
*/
public function dontRelease()
{
$this->releaseAfter = null;
return $this;
}
/**
* Set the expiry (in seconds) of the lock key.
*
* @param int $expiresAt
* @return $this
*/
public function expireAt($expiresAt)
{
$this->expiresAt = $expiresAt;
return $this;
}
/**
* Set the delay (in seconds) to release the job back to the queue.
*
* @param int $releaseAfter
* @return $this
*/
public function releaseAfter($releaseAfter)
{
$this->releaseAfter = $releaseAfter;
return $this;
}
/**
* Set the prefix of the lock key.
*
* @param string $prefix
* @return $this
*/
public function withPrefix($prefix)
{
$this->prefix = $prefix;
return $this;
}
/**
* Get the lock key.
*
* @param mixed $job
* @return string
*/
public function getLockKey($job)
{
return $this->prefix.get_class($job).':'.$this->key;
}
}