forked from benoitc/gunicorn
/
test_sock.py
56 lines (46 loc) · 1.64 KB
/
test_sock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import fcntl
try:
import unittest.mock as mock
except ImportError:
import mock
from gunicorn import sock
@mock.patch('fcntl.lockf')
@mock.patch('socket.fromfd')
def test_unix_socket_init_lock(fromfd, lockf):
s = fromfd.return_value
sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.assert_called_with(s, fcntl.LOCK_SH | fcntl.LOCK_NB)
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid')
@mock.patch('os.unlink')
@mock.patch('socket.fromfd')
def test_unix_socket_close_delete_if_exlock(fromfd, unlink, getpid, lockf):
s = fromfd.return_value
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.reset_mock()
gsock.close()
lockf.assert_called_with(s, fcntl.LOCK_EX | fcntl.LOCK_NB)
unlink.assert_called_with('test.sock')
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid')
@mock.patch('os.unlink')
@mock.patch('socket.fromfd')
def test_unix_socket_close_keep_if_no_exlock(fromfd, unlink, getpid, lockf):
s = fromfd.return_value
gsock = sock.UnixSocket('test.sock', mock.Mock(), mock.Mock(), mock.Mock())
lockf.reset_mock()
lockf.side_effect = IOError('locked')
gsock.close()
lockf.assert_called_with(s, fcntl.LOCK_EX | fcntl.LOCK_NB)
unlink.assert_not_called()
@mock.patch('fcntl.lockf')
@mock.patch('os.getpid')
@mock.patch('socket.fromfd')
def test_unix_socket_not_deleted_by_worker(fromfd, getpid, lockf):
fd = mock.Mock()
gsock = sock.UnixSocket('name', mock.Mock(), mock.Mock(), fd)
lockf.reset_mock()
getpid.reset_mock()
getpid.return_value = mock.Mock() # fake a pid change
gsock.close()
lockf.assert_not_called()