This repository has been archived by the owner on Feb 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 335
/
abc.py
157 lines (119 loc) · 3.74 KB
/
abc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""The module provides connection and connections pool interfaces.
These are intended to be used for implementing custom connection managers.
"""
import abc
import asyncio
__all__ = [
'AbcConnection',
'AbcPool',
'AbcChannel',
]
class AbcConnection(abc.ABC):
"""Abstract connection interface."""
@abc.abstractmethod
def execute(self, command, *args, **kwargs):
"""Execute redis command."""
@abc.abstractmethod
def execute_pubsub(self, command, *args, **kwargs):
"""Execute Redis (p)subscribe/(p)unsubscribe commands."""
@abc.abstractmethod
def close(self):
"""Perform connection(s) close and resources cleanup."""
@asyncio.coroutine
@abc.abstractmethod
def wait_closed(self):
"""
Coroutine waiting until all resources are closed/released/cleaned up.
"""
@property
@abc.abstractmethod
def closed(self):
"""Flag indicating if connection is closing or already closed."""
@property
@abc.abstractmethod
def db(self):
"""Current selected DB index."""
@property
@abc.abstractmethod
def encoding(self):
"""Current set connection codec."""
@property
@abc.abstractmethod
def in_pubsub(self):
"""Returns number of subscribed channels.
Can be tested as bool indicating Pub/Sub mode state.
"""
@property
@abc.abstractmethod
def pubsub_channels(self):
"""Read-only channels dict."""
@property
@abc.abstractmethod
def pubsub_patterns(self):
"""Read-only patterns dict."""
@property
@abc.abstractmethod
def address(self):
"""Connection address."""
class AbcPool(AbcConnection):
"""Abstract connections pool interface.
Inherited from AbcConnection so both have common interface
for executing Redis commands.
"""
@abc.abstractmethod
def get_connection(self, command, args=()):
"""
Gets free connection from pool in a sync way.
If no connection available — returns None.
"""
@asyncio.coroutine
@abc.abstractmethod
def acquire(self): # TODO: arguments
"""Acquires connection from pool."""
@abc.abstractmethod
def release(self, conn): # TODO: arguments
"""Releases connection to pool.
:param AbcConnection conn: Owned connection to be released.
"""
@property
@abc.abstractmethod
def address(self):
"""Connection address or None."""
class AbcChannel(abc.ABC):
"""Abstract Pub/Sub Channel interface."""
@property
@abc.abstractmethod
def name(self):
"""Encoded channel name or pattern."""
@property
@abc.abstractmethod
def is_pattern(self):
"""Boolean flag indicating if channel is pattern channel."""
@property
@abc.abstractmethod
def is_active(self):
"""Flag indicating that channel has unreceived messages
and not marked as closed."""
@asyncio.coroutine
@abc.abstractmethod
def get(self):
"""Wait and return new message.
Will raise ``ChannelClosedError`` if channel is not active.
"""
# wait_message is not required; details of implementation
# @abc.abstractmethod
# def wait_message(self):
# pass
@abc.abstractmethod
def put_nowait(self, data):
"""Send data to channel.
Called by RedisConnection when new message received.
For pattern subscriptions data will be a tuple of
channel name and message itself.
"""
@abc.abstractmethod
def close(self, exc=None):
"""Marks Channel as closed, no more messages will be sent to it.
Called by RedisConnection when channel is unsubscribed
or connection is closed.
"""