Skip to content

Commit

Permalink
chore: move connect to json pipe (#1580)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavelfeldman committed Oct 6, 2022
1 parent d2dad56 commit bddee13
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 88 deletions.
1 change: 0 additions & 1 deletion meta.yaml
Expand Up @@ -25,7 +25,6 @@ requirements:
- python
- greenlet ==1.1.3
- pyee ==8.1.0
- websockets ==10.1
- typing_extensions # [py<39]
test:
requires:
Expand Down
18 changes: 14 additions & 4 deletions playwright/_impl/_browser_type.py
Expand Up @@ -42,7 +42,7 @@
ServiceWorkersPolicy,
locals_to_params,
)
from playwright._impl._transport import WebSocketTransport
from playwright._impl._json_pipe import JsonPipeTransport
from playwright._impl._wait_helper import throw_on_timeout

if TYPE_CHECKING:
Expand Down Expand Up @@ -188,12 +188,22 @@ async def connect(
) -> Browser:
if timeout is None:
timeout = 30000
if slow_mo is None:
slow_mo = 0

headers = {**(headers if headers else {}), "x-playwright-browser": self.name}

transport = WebSocketTransport(
self._connection._loop, ws_endpoint, headers, slow_mo
local_utils = self._connection.local_utils
pipe_channel = await local_utils._channel.send(
"connect",
{
"wsEndpoint": ws_endpoint,
"headers": headers,
"slowMo": slow_mo,
"timeout": timeout,
},
)
transport = JsonPipeTransport(self._connection._loop, pipe_channel)

connection = Connection(
self._connection._dispatcher_fiber,
self._connection._object_factory,
Expand Down
13 changes: 11 additions & 2 deletions playwright/_impl/_connection.py
Expand Up @@ -316,16 +316,25 @@ def dispatch(self, msg: ParsedMessagePayload) -> None:
self._objects[guid]._dispose()
return
object = self._objects[guid]
should_replace_guids_with_channels = "jsonPipe@" not in guid
try:
if self._is_sync:
for listener in object._channel.listeners(method):
# Each event handler is a potentilly blocking context, create a fiber for each
# and switch to them in order, until they block inside and pass control to each
# other and then eventually back to dispatcher as listener functions return.
g = greenlet(listener)
g.switch(self._replace_guids_with_channels(params))
if should_replace_guids_with_channels:
g.switch(self._replace_guids_with_channels(params))
else:
g.switch(params)
else:
object._channel.emit(method, self._replace_guids_with_channels(params))
if should_replace_guids_with_channels:
object._channel.emit(
method, self._replace_guids_with_channels(params)
)
else:
object._channel.emit(method, params)
except BaseException as exc:
print("Error occurred in event listener", file=sys.stderr)
traceback.print_exc()
Expand Down
79 changes: 79 additions & 0 deletions playwright/_impl/_json_pipe.py
@@ -0,0 +1,79 @@
# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import Dict, Optional, cast

from pyee import AsyncIOEventEmitter

from playwright._impl._api_types import Error
from playwright._impl._connection import Channel
from playwright._impl._helper import ParsedMessagePayload, parse_error
from playwright._impl._transport import Transport


class JsonPipeTransport(AsyncIOEventEmitter, Transport):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
pipe_channel: Channel,
) -> None:
super().__init__(loop)
Transport.__init__(self, loop)
self._stop_requested = False
self._pipe_channel = pipe_channel

def request_stop(self) -> None:
self._stop_requested = True
self._loop.create_task(self._pipe_channel.send("close", {}))

def dispose(self) -> None:
self.on_error_future.cancel()
self._stopped_future.cancel()

async def wait_until_stopped(self) -> None:
await self._stopped_future

async def connect(self) -> None:
self._stopped_future: asyncio.Future = asyncio.Future()

def handle_message(message: Dict) -> None:
if not self._stop_requested:
self.on_message(cast(ParsedMessagePayload, message))

def handle_closed(error: Optional[Dict]) -> None:
self.emit("close")
self.on_error_future.set_exception(
parse_error(error["error"])
if error
else Error("Playwright connection closed")
)
self._stopped_future.set_result(None)

self._pipe_channel.on(
"message",
lambda params: handle_message(params["message"]),
)
self._pipe_channel.on(
"closed",
lambda params: handle_closed(params.get("error")),
)

async def run(self) -> None:
await self._stopped_future

def send(self, message: Dict) -> None:
if self._stop_requested:
raise Error("Playwright connection closed")
self._loop.create_task(self._pipe_channel.send("send", {"message": message}))
78 changes: 0 additions & 78 deletions playwright/_impl/_transport.py
Expand Up @@ -22,12 +22,6 @@
from pathlib import Path
from typing import Callable, Dict, Optional, Union

import websockets
import websockets.exceptions
from pyee import AsyncIOEventEmitter
from websockets.client import connect as websocket_connect

from playwright._impl._api_types import Error
from playwright._impl._driver import get_driver_env
from playwright._impl._helper import ParsedMessagePayload

Expand Down Expand Up @@ -178,75 +172,3 @@ def send(self, message: Dict) -> None:
self._output.write(
len(data).to_bytes(4, byteorder="little", signed=False) + data
)


class WebSocketTransport(AsyncIOEventEmitter, Transport):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
ws_endpoint: str,
headers: Dict[str, str] = None,
slow_mo: float = None,
) -> None:
super().__init__(loop)
Transport.__init__(self, loop)

self._stopped = False
self.ws_endpoint = ws_endpoint
self.headers = headers
self.slow_mo = slow_mo

def request_stop(self) -> None:
self._stopped = True
self.emit("close")
self._loop.create_task(self._connection.close())

def dispose(self) -> None:
self.on_error_future.cancel()

async def wait_until_stopped(self) -> None:
await self._connection.wait_closed()

async def connect(self) -> None:
try:
self._connection = await websocket_connect(
self.ws_endpoint,
extra_headers=self.headers,
max_size=256 * 1024 * 1024, # 256Mb
)
except Exception as exc:
self.on_error_future.set_exception(Error(f"websocket.connect: {str(exc)}"))
raise exc

async def run(self) -> None:
while not self._stopped:
try:
message = await self._connection.recv()
if self.slow_mo is not None:
await asyncio.sleep(self.slow_mo / 1000)
if self._stopped:
self.on_error_future.set_exception(
Error("Playwright connection closed")
)
break
obj = self.deserialize_message(message)
self.on_message(obj)
except (
websockets.exceptions.ConnectionClosed,
websockets.exceptions.ConnectionClosedError,
):
if not self._stopped:
self.emit("close")
self.on_error_future.set_exception(
Error("Playwright connection closed")
)
break
except Exception as exc:
self.on_error_future.set_exception(exc)
break

def send(self, message: Dict) -> None:
if self._stopped or (hasattr(self, "_connection") and self._connection.closed):
raise Error("Playwright connection closed")
data = self.serialize_message(message)
self._loop.create_task(self._connection.send(data))
1 change: 0 additions & 1 deletion setup.py
Expand Up @@ -211,7 +211,6 @@ def _download_and_extract_local_driver(
packages=["playwright"],
include_package_data=True,
install_requires=[
"websockets==10.1",
"greenlet==1.1.3",
"pyee==8.1.0",
"typing-extensions;python_version<='3.8'",
Expand Down
2 changes: 1 addition & 1 deletion tests/async/test_browsertype_connect.py
Expand Up @@ -213,7 +213,7 @@ async def test_connect_to_closed_server_without_hangs(
remote_server.kill()
with pytest.raises(Error) as exc:
await browser_type.connect(remote_server.ws_endpoint)
assert "websocket.connect: " in exc.value.message
assert "WebSocket error: " in exc.value.message


async def test_should_fulfill_with_global_fetch_result(
Expand Down
2 changes: 1 addition & 1 deletion tests/sync/test_browsertype_connect.py
Expand Up @@ -192,7 +192,7 @@ def test_connect_to_closed_server_without_hangs(
remote_server.kill()
with pytest.raises(Error) as exc:
browser_type.connect(remote_server.ws_endpoint)
assert "websocket.connect: " in exc.value.message
assert "WebSocket error: " in exc.value.message


def test_browser_type_connect_should_fulfill_with_global_fetch_result(
Expand Down

0 comments on commit bddee13

Please sign in to comment.