Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: move connect to json pipe #1580

Merged
merged 1 commit into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion meta.yaml
Expand Up @@ -25,7 +25,6 @@ requirements:
- python
- greenlet ==1.1.3
- pyee ==8.1.0
- websockets ==10.1
- typing_extensions # [py<39]
test:
requires:
Expand Down
18 changes: 14 additions & 4 deletions playwright/_impl/_browser_type.py
Expand Up @@ -42,7 +42,7 @@
ServiceWorkersPolicy,
locals_to_params,
)
from playwright._impl._transport import WebSocketTransport
from playwright._impl._json_pipe import JsonPipeTransport
from playwright._impl._wait_helper import throw_on_timeout

if TYPE_CHECKING:
Expand Down Expand Up @@ -188,12 +188,22 @@ async def connect(
) -> Browser:
if timeout is None:
timeout = 30000
if slow_mo is None:
slow_mo = 0

headers = {**(headers if headers else {}), "x-playwright-browser": self.name}

transport = WebSocketTransport(
self._connection._loop, ws_endpoint, headers, slow_mo
local_utils = self._connection.local_utils
pipe_channel = await local_utils._channel.send(
"connect",
{
"wsEndpoint": ws_endpoint,
"headers": headers,
"slowMo": slow_mo,
"timeout": timeout,
},
)
transport = JsonPipeTransport(self._connection._loop, pipe_channel)

connection = Connection(
self._connection._dispatcher_fiber,
self._connection._object_factory,
Expand Down
13 changes: 11 additions & 2 deletions playwright/_impl/_connection.py
Expand Up @@ -316,16 +316,25 @@ def dispatch(self, msg: ParsedMessagePayload) -> None:
self._objects[guid]._dispose()
return
object = self._objects[guid]
should_replace_guids_with_channels = "jsonPipe@" not in guid
try:
if self._is_sync:
for listener in object._channel.listeners(method):
# Each event handler is a potentilly blocking context, create a fiber for each
# and switch to them in order, until they block inside and pass control to each
# other and then eventually back to dispatcher as listener functions return.
g = greenlet(listener)
g.switch(self._replace_guids_with_channels(params))
if should_replace_guids_with_channels:
g.switch(self._replace_guids_with_channels(params))
else:
g.switch(params)
else:
object._channel.emit(method, self._replace_guids_with_channels(params))
if should_replace_guids_with_channels:
object._channel.emit(
method, self._replace_guids_with_channels(params)
)
else:
object._channel.emit(method, params)
except BaseException as exc:
print("Error occurred in event listener", file=sys.stderr)
traceback.print_exc()
Expand Down
79 changes: 79 additions & 0 deletions playwright/_impl/_json_pipe.py
@@ -0,0 +1,79 @@
# Copyright (c) Microsoft Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import Dict, Optional, cast

from pyee import AsyncIOEventEmitter

from playwright._impl._api_types import Error
from playwright._impl._connection import Channel
from playwright._impl._helper import ParsedMessagePayload, parse_error
from playwright._impl._transport import Transport


class JsonPipeTransport(AsyncIOEventEmitter, Transport):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
pipe_channel: Channel,
) -> None:
super().__init__(loop)
Transport.__init__(self, loop)
self._stop_requested = False
self._pipe_channel = pipe_channel

def request_stop(self) -> None:
self._stop_requested = True
self._loop.create_task(self._pipe_channel.send("close", {}))

def dispose(self) -> None:
self.on_error_future.cancel()
self._stopped_future.cancel()

async def wait_until_stopped(self) -> None:
await self._stopped_future

async def connect(self) -> None:
self._stopped_future: asyncio.Future = asyncio.Future()

def handle_message(message: Dict) -> None:
if not self._stop_requested:
self.on_message(cast(ParsedMessagePayload, message))

def handle_closed(error: Optional[Dict]) -> None:
self.emit("close")
self.on_error_future.set_exception(
parse_error(error["error"])
if error
else Error("Playwright connection closed")
)
self._stopped_future.set_result(None)

self._pipe_channel.on(
"message",
lambda params: handle_message(params["message"]),
)
self._pipe_channel.on(
"closed",
lambda params: handle_closed(params.get("error")),
)

async def run(self) -> None:
await self._stopped_future

def send(self, message: Dict) -> None:
if self._stop_requested:
raise Error("Playwright connection closed")
self._loop.create_task(self._pipe_channel.send("send", {"message": message}))
78 changes: 0 additions & 78 deletions playwright/_impl/_transport.py
Expand Up @@ -22,12 +22,6 @@
from pathlib import Path
from typing import Callable, Dict, Optional, Union

import websockets
import websockets.exceptions
from pyee import AsyncIOEventEmitter
from websockets.client import connect as websocket_connect

from playwright._impl._api_types import Error
from playwright._impl._driver import get_driver_env
from playwright._impl._helper import ParsedMessagePayload

Expand Down Expand Up @@ -178,75 +172,3 @@ def send(self, message: Dict) -> None:
self._output.write(
len(data).to_bytes(4, byteorder="little", signed=False) + data
)


class WebSocketTransport(AsyncIOEventEmitter, Transport):
def __init__(
self,
loop: asyncio.AbstractEventLoop,
ws_endpoint: str,
headers: Dict[str, str] = None,
slow_mo: float = None,
) -> None:
super().__init__(loop)
Transport.__init__(self, loop)

self._stopped = False
self.ws_endpoint = ws_endpoint
self.headers = headers
self.slow_mo = slow_mo

def request_stop(self) -> None:
self._stopped = True
self.emit("close")
self._loop.create_task(self._connection.close())

def dispose(self) -> None:
self.on_error_future.cancel()

async def wait_until_stopped(self) -> None:
await self._connection.wait_closed()

async def connect(self) -> None:
try:
self._connection = await websocket_connect(
self.ws_endpoint,
extra_headers=self.headers,
max_size=256 * 1024 * 1024, # 256Mb
)
except Exception as exc:
self.on_error_future.set_exception(Error(f"websocket.connect: {str(exc)}"))
raise exc

async def run(self) -> None:
while not self._stopped:
try:
message = await self._connection.recv()
if self.slow_mo is not None:
await asyncio.sleep(self.slow_mo / 1000)
if self._stopped:
self.on_error_future.set_exception(
Error("Playwright connection closed")
)
break
obj = self.deserialize_message(message)
self.on_message(obj)
except (
websockets.exceptions.ConnectionClosed,
websockets.exceptions.ConnectionClosedError,
):
if not self._stopped:
self.emit("close")
self.on_error_future.set_exception(
Error("Playwright connection closed")
)
break
except Exception as exc:
self.on_error_future.set_exception(exc)
break

def send(self, message: Dict) -> None:
if self._stopped or (hasattr(self, "_connection") and self._connection.closed):
raise Error("Playwright connection closed")
data = self.serialize_message(message)
self._loop.create_task(self._connection.send(data))
1 change: 0 additions & 1 deletion setup.py
Expand Up @@ -211,7 +211,6 @@ def _download_and_extract_local_driver(
packages=["playwright"],
include_package_data=True,
install_requires=[
"websockets==10.1",
"greenlet==1.1.3",
"pyee==8.1.0",
"typing-extensions;python_version<='3.8'",
Expand Down
2 changes: 1 addition & 1 deletion tests/async/test_browsertype_connect.py
Expand Up @@ -213,7 +213,7 @@ async def test_connect_to_closed_server_without_hangs(
remote_server.kill()
with pytest.raises(Error) as exc:
await browser_type.connect(remote_server.ws_endpoint)
assert "websocket.connect: " in exc.value.message
assert "WebSocket error: " in exc.value.message


async def test_should_fulfill_with_global_fetch_result(
Expand Down
2 changes: 1 addition & 1 deletion tests/sync/test_browsertype_connect.py
Expand Up @@ -192,7 +192,7 @@ def test_connect_to_closed_server_without_hangs(
remote_server.kill()
with pytest.raises(Error) as exc:
browser_type.connect(remote_server.ws_endpoint)
assert "websocket.connect: " in exc.value.message
assert "WebSocket error: " in exc.value.message


def test_browser_type_connect_should_fulfill_with_global_fetch_result(
Expand Down