Skip to content

Commit

Permalink
Rename connection to ws_proto.
Browse files Browse the repository at this point in the history
This is consistent with the io_proto instance variable and with the type
of the variable: it's a websockets.server.ServerProtocol.
  • Loading branch information
aaugustin committed Nov 27, 2022
1 parent 65409f3 commit 4ee4a34
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 44 deletions.
2 changes: 1 addition & 1 deletion sanic/server/protocols/websocket_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def close_if_idle(self):
# Called by Sanic Server when shutting down
# If we've upgraded to websocket, shut it down
if self.websocket is not None:
if self.websocket.connection.state in (CLOSING, CLOSED):
if self.websocket.ws_proto.state in (CLOSING, CLOSED):
return True
elif self.websocket.loop is not None:
self.websocket.loop.create_task(self.websocket.close(1001))
Expand Down
86 changes: 43 additions & 43 deletions sanic/server/websockets/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


class WebsocketImplProtocol:
connection: ServerProtocol
ws_proto: ServerProtocol
io_proto: Optional[SanicProtocol]
loop: Optional[asyncio.AbstractEventLoop]
max_queue: int
Expand All @@ -63,14 +63,14 @@ class WebsocketImplProtocol:

def __init__(
self,
connection,
ws_proto,
max_queue=None,
ping_interval: Optional[float] = 20,
ping_timeout: Optional[float] = 20,
close_timeout: float = 10,
loop=None,
):
self.connection = connection
self.ws_proto = ws_proto
self.io_proto = None
self.loop = None
self.max_queue = max_queue
Expand All @@ -92,7 +92,7 @@ def __init__(

@property
def subprotocol(self):
return self.connection.subprotocol
return self.ws_proto.subprotocol

def pause_frames(self):
if not self.can_pause:
Expand Down Expand Up @@ -306,15 +306,15 @@ def fail_connection(self, code: int = 1006, reason: str = "") -> bool:
# Not draining the write buffer is acceptable in this context.

# clear the send buffer
_ = self.connection.data_to_send()
_ = self.ws_proto.data_to_send()
# If we're not already CLOSED or CLOSING, then send the close.
if self.connection.state is OPEN:
if self.ws_proto.state is OPEN:
if code in (1000, 1001):
self.connection.send_close(code, reason)
self.ws_proto.send_close(code, reason)
else:
self.connection.fail(code, reason)
self.ws_proto.fail(code, reason)
try:
data_to_send = self.connection.data_to_send()
data_to_send = self.ws_proto.data_to_send()
while (
len(data_to_send)
and self.io_proto
Expand All @@ -328,7 +328,7 @@ def fail_connection(self, code: int = 1006, reason: str = "") -> bool:
...
if code == 1006:
# Special case: 1006 consider the transport already closed
self.connection.state = CLOSED
self.ws_proto.state = CLOSED
if self.data_finished_fut and not self.data_finished_fut.done():
# We have a graceful auto-closer. Use it to close the connection.
self.data_finished_fut.cancel()
Expand All @@ -349,10 +349,10 @@ def end_connection(self, code=1000, reason=""):
# In Python Version 3.7: pause_reading is idempotent
# i.e. it can be called when the transport is already paused or closed.
self.io_proto.transport.pause_reading()
if self.connection.state == OPEN:
data_to_send = self.connection.data_to_send()
self.connection.send_close(code, reason)
data_to_send.extend(self.connection.data_to_send())
if self.ws_proto.state == OPEN:
data_to_send = self.ws_proto.data_to_send()
self.ws_proto.send_close(code, reason)
data_to_send.extend(self.ws_proto.data_to_send())
try:
while (
len(data_to_send)
Expand Down Expand Up @@ -461,7 +461,7 @@ def abort_pings(self) -> None:
Raise ConnectionClosed in pending keepalive pings.
They'll never receive a pong once the connection is closed.
"""
if self.connection.state is not CLOSED:
if self.ws_proto.state is not CLOSED:
raise ServerError(
"Webscoket about_pings should only be called "
"after connection state is changed to CLOSED"
Expand Down Expand Up @@ -490,9 +490,9 @@ async def close(self, code: int = 1000, reason: str = "") -> None:
self.fail_connection(code, reason)
return
async with self.conn_mutex:
if self.connection.state is OPEN:
self.connection.send_close(code, reason)
data_to_send = self.connection.data_to_send()
if self.ws_proto.state is OPEN:
self.ws_proto.send_close(code, reason)
data_to_send = self.ws_proto.data_to_send()
await self.send_data(data_to_send)

async def recv(self, timeout: Optional[float] = None) -> Optional[Data]:
Expand Down Expand Up @@ -522,7 +522,7 @@ async def recv(self, timeout: Optional[float] = None) -> Optional[Data]:
"already waiting for the next message"
)
await self.recv_lock.acquire()
if self.connection.state is CLOSED:
if self.ws_proto.state is CLOSED:
self.recv_lock.release()
raise WebsocketClosed(
"Cannot receive from websocket interface after it is closed."
Expand Down Expand Up @@ -573,7 +573,7 @@ async def recv_burst(self, max_recv=256) -> Sequence[Data]:
"for the next message"
)
await self.recv_lock.acquire()
if self.connection.state is CLOSED:
if self.ws_proto.state is CLOSED:
self.recv_lock.release()
raise WebsocketClosed(
"Cannot receive from websocket interface after it is closed."
Expand Down Expand Up @@ -632,7 +632,7 @@ async def recv_streaming(self) -> AsyncIterator[Data]:
"is already waiting for the next message"
)
await self.recv_lock.acquire()
if self.connection.state is CLOSED:
if self.ws_proto.state is CLOSED:
self.recv_lock.release()
raise WebsocketClosed(
"Cannot receive from websocket interface after it is closed."
Expand Down Expand Up @@ -673,7 +673,7 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None:
"""
async with self.conn_mutex:

if self.connection.state in (CLOSED, CLOSING):
if self.ws_proto.state in (CLOSED, CLOSING):
raise WebsocketClosed(
"Cannot write to websocket interface after it is closed."
)
Expand All @@ -686,12 +686,12 @@ async def send(self, message: Union[Data, Iterable[Data]]) -> None:
# strings and bytes-like objects are iterable.

if isinstance(message, str):
self.connection.send_text(message.encode("utf-8"))
await self.send_data(self.connection.data_to_send())
self.ws_proto.send_text(message.encode("utf-8"))
await self.send_data(self.ws_proto.data_to_send())

elif isinstance(message, (bytes, bytearray, memoryview)):
self.connection.send_binary(message)
await self.send_data(self.connection.data_to_send())
self.ws_proto.send_binary(message)
await self.send_data(self.ws_proto.data_to_send())

elif isinstance(message, Mapping):
# Catch a common mistake -- passing a dict to send().
Expand Down Expand Up @@ -720,7 +720,7 @@ async def ping(self, data: Optional[Data] = None) -> asyncio.Future:
(which will be encoded to UTF-8) or a bytes-like object.
"""
async with self.conn_mutex:
if self.connection.state in (CLOSED, CLOSING):
if self.ws_proto.state in (CLOSED, CLOSING):
raise WebsocketClosed(
"Cannot send a ping when the websocket interface "
"is closed."
Expand Down Expand Up @@ -748,8 +748,8 @@ async def ping(self, data: Optional[Data] = None) -> asyncio.Future:

self.pings[data] = self.io_proto.loop.create_future()

self.connection.send_ping(data)
await self.send_data(self.connection.data_to_send())
self.ws_proto.send_ping(data)
await self.send_data(self.ws_proto.data_to_send())

return asyncio.shield(self.pings[data])

Expand All @@ -761,15 +761,15 @@ async def pong(self, data: Data = b"") -> None:
be a string (which will be encoded to UTF-8) or a bytes-like object.
"""
async with self.conn_mutex:
if self.connection.state in (CLOSED, CLOSING):
if self.ws_proto.state in (CLOSED, CLOSING):
# Cannot send pong after transport is shutting down
return
if isinstance(data, str):
data = data.encode("utf-8")
elif isinstance(data, (bytearray, memoryview)):
data = bytes(data)
self.connection.send_pong(data)
await self.send_data(self.connection.data_to_send())
self.ws_proto.send_pong(data)
await self.send_data(self.ws_proto.data_to_send())

async def send_data(self, data_to_send):
for data in data_to_send:
Expand All @@ -791,17 +791,17 @@ async def send_data(self, data_to_send):
SanicProtocol.close(self.io_proto, timeout=1.0)

async def async_data_received(self, data_to_send, events_to_process):
if self.connection.state in (OPEN, CLOSING) and len(data_to_send) > 0:
if self.ws_proto.state in (OPEN, CLOSING) and len(data_to_send) > 0:
# receiving data can generate data to send (eg, pong for a ping)
# send connection.data_to_send()
await self.send_data(data_to_send)
if len(events_to_process) > 0:
await self.process_events(events_to_process)

def data_received(self, data):
self.connection.receive_data(data)
data_to_send = self.connection.data_to_send()
events_to_process = self.connection.events_received()
self.ws_proto.receive_data(data)
data_to_send = self.ws_proto.data_to_send()
events_to_process = self.ws_proto.events_received()
if len(data_to_send) > 0 or len(events_to_process) > 0:
asyncio.create_task(
self.async_data_received(data_to_send, events_to_process)
Expand All @@ -810,7 +810,7 @@ def data_received(self, data):
async def async_eof_received(self, data_to_send, events_to_process):
# receiving EOF can generate data to send
# send connection.data_to_send()
if self.connection.state in (OPEN, CLOSING) and len(data_to_send) > 0:
if self.ws_proto.state in (OPEN, CLOSING) and len(data_to_send) > 0:
await self.send_data(data_to_send)
if len(events_to_process) > 0:
await self.process_events(events_to_process)
Expand All @@ -830,9 +830,9 @@ async def async_eof_received(self, data_to_send, events_to_process):
SanicProtocol.close(self.io_proto, timeout=1.0)

def eof_received(self) -> Optional[bool]:
self.connection.receive_eof()
data_to_send = self.connection.data_to_send()
events_to_process = self.connection.events_received()
self.ws_proto.receive_eof()
data_to_send = self.ws_proto.data_to_send()
events_to_process = self.ws_proto.events_received()
asyncio.create_task(
self.async_eof_received(data_to_send, events_to_process)
)
Expand All @@ -842,11 +842,11 @@ def connection_lost(self, exc):
"""
The WebSocket Connection is Closed.
"""
if not self.connection.state == CLOSED:
if not self.ws_proto.state == CLOSED:
# signal to the websocket connection handler
# we've lost the connection
self.connection.fail(code=1006)
self.connection.state = CLOSED
self.ws_proto.fail(code=1006)
self.ws_proto.state = CLOSED

self.abort_pings()
if self.connection_lost_waiter:
Expand Down

0 comments on commit 4ee4a34

Please sign in to comment.