-
-
Notifications
You must be signed in to change notification settings - Fork 73
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add post-maintenance reconnect helper for GMO Coin (#290)
- Loading branch information
Showing
7 changed files
with
363 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
import asyncio | ||
import os | ||
|
||
try: | ||
from rich import print | ||
except ImportError: | ||
pass | ||
|
||
import pybotters | ||
from pybotters.helpers import GMOCoinHelper | ||
|
||
|
||
async def main(): | ||
apis = { | ||
"gmocoin": [ | ||
os.getenv("GMOCOIN_API_KEY", ""), | ||
os.getenv("GMOCOIN_API_SECRET", ""), | ||
], | ||
} | ||
|
||
async with pybotters.Client(apis=apis) as client: | ||
store = pybotters.GMOCoinDataStore() | ||
|
||
# Create a helper instance for GMOCoin. | ||
gmohelper = GMOCoinHelper(client) | ||
|
||
# Alias for POST /private/v1/ws-auth . | ||
token = await gmohelper.create_access_token() | ||
|
||
ws = client.ws_connect( | ||
# Build the Private WebSocket URL. | ||
f"wss://api.coin.z.com/ws/private/v1/{token}", | ||
send_json={ | ||
"command": "subscribe", | ||
"channel": "positionSummaryEvents", | ||
"option": "PERIODIC", | ||
}, | ||
hdlr_json=store.onmessage, | ||
) | ||
|
||
# Create a task to manage WebSocket URL and access token. | ||
asyncio.create_task( | ||
gmohelper.manage_ws_token(ws, token), | ||
) | ||
|
||
with store.position_summary.watch() as stream: | ||
async for change in stream: | ||
print(change.data) | ||
|
||
|
||
if __name__ == "__main__": | ||
asyncio.run(main()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from __future__ import annotations | ||
|
||
from .gmocoin import GMOCoinHelper, GMOCoinResponseError | ||
|
||
__all__: tuple[str, ...] = ( | ||
"GMOCoinHelper", | ||
"GMOCoinResponseError", | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
import asyncio | ||
import logging | ||
from typing import NoReturn | ||
|
||
from ..client import Client | ||
from ..ws import WebSocketApp | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class GMOCoinHelper: | ||
def __init__( | ||
self, | ||
client: Client, | ||
) -> None: | ||
"""Post-maintenance reconnection helper for GMO Coin. | ||
Args: | ||
client (Client): pybotters.Client | ||
""" | ||
self._client = client | ||
self._url = removeprefix( | ||
"https://api.coin.z.com/private/v1/ws-auth", self._client._base_url | ||
) | ||
|
||
async def create_access_token(self) -> str: | ||
"""Helper for ``POST /private/v1/ws-auth``. | ||
Raises: | ||
GMOCoinResponseError: Response error. | ||
Returns: | ||
str: Created access token. | ||
""" | ||
r = await self._client.fetch( | ||
"POST", | ||
self._url, | ||
) | ||
|
||
if isinstance(r.data, dict) and r.data.get("status") == 0: | ||
token = r.data["data"] | ||
return token | ||
else: | ||
raise GMOCoinResponseError(r.text) | ||
|
||
async def extend_access_token(self, token: str) -> None: | ||
"""Helper for ``PUT /private/v1/ws-auth``. | ||
Args: | ||
token (str): Access token to extend | ||
Raises: | ||
GMOCoinResponseError: Response error. | ||
""" | ||
r = await self._client.fetch( | ||
"PUT", | ||
self._url, | ||
data={"token": token}, | ||
) | ||
|
||
if isinstance(r.data, dict) and r.data.get("status") == 0: | ||
return | ||
else: | ||
raise GMOCoinResponseError(r.text) | ||
|
||
async def manage_ws_token( | ||
self, | ||
ws: WebSocketApp, | ||
token: str, | ||
delay: float = 300.0, # 5 minutes | ||
) -> NoReturn: | ||
"""Manage the access token for the WebSocket connection. | ||
This method is a coroutine for an infinite loop. | ||
It should be executed by :meth:`asyncio.create_task`. | ||
Args: | ||
ws (WebSocketApp): WebSocketApp instance. | ||
token (str): Access token. | ||
delay (float, optional): Sleep time. Defaults to 300.0 (5 minutes). | ||
""" | ||
while True: | ||
try: | ||
await self.extend_access_token(token) | ||
except GMOCoinResponseError as e1: | ||
try: | ||
token = await self.create_access_token() | ||
except GMOCoinResponseError as e2: | ||
logger.debug( | ||
f"GMO Coin access token could not be extended or created: {e1} {e2}" | ||
) | ||
else: | ||
ws.url = f"wss://api.coin.z.com/ws/private/v1/{token}" | ||
logger.debug("GMO Coin Access Token has been created") | ||
else: | ||
logger.debug("GMO Coin Access Token has been extended") | ||
|
||
await asyncio.sleep(delay) | ||
|
||
|
||
class GMOCoinResponseError(Exception): ... | ||
|
||
|
||
def removeprefix(self: str, prefix: str, /) -> str: | ||
return self[len(prefix) :] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
import asyncio | ||
from contextlib import suppress | ||
from typing import Awaitable, Callable, Type | ||
from unittest.mock import AsyncMock | ||
|
||
import aiohttp | ||
import pytest | ||
import pytest_asyncio | ||
from aiohttp import web | ||
from aiohttp.test_utils import TestServer | ||
from aiohttp.typedefs import StrOrURL | ||
from yarl import URL | ||
|
||
import pybotters | ||
from pybotters.helpers.gmocoin import GMOCoinHelper | ||
from pybotters.request import ClientRequest | ||
|
||
|
||
@pytest_asyncio.fixture | ||
async def pybotters_client( | ||
aiohttp_client_cls: Type[aiohttp.ClientSession], | ||
monkeypatch: pytest.MonkeyPatch, | ||
): | ||
# From https://github.com/aio-libs/pytest-aiohttp/blob/v1.0.4/pytest_aiohttp/plugin.py#L139 | ||
|
||
clients = [] | ||
|
||
async def go(app, **kwargs): | ||
server = TestServer(app) | ||
aiohttp_client = aiohttp_client_cls(server) | ||
aiohttp_client._session._request_class = ClientRequest | ||
|
||
await aiohttp_client.start_server() | ||
clients.append(aiohttp_client) | ||
|
||
def dummy_request(method: str, str_or_url: StrOrURL, **kwargs): | ||
return aiohttp_client._request(method, URL(str_or_url).path_qs, **kwargs) | ||
|
||
_pybotters_client = pybotters.Client(**kwargs) | ||
monkeypatch.setattr( | ||
_pybotters_client._session, | ||
_pybotters_client._session._request.__name__, | ||
dummy_request, | ||
) | ||
return _pybotters_client | ||
|
||
yield go | ||
|
||
while clients: | ||
await clients.pop().close() | ||
|
||
|
||
async def create_access_token(request: web.Request): | ||
return web.json_response( | ||
{ | ||
"status": 0, | ||
"data": "xxxxxxxxxxxxxxxxxxxx", | ||
"responsetime": "2019-03-19T02:15:06.102Z", | ||
} | ||
) | ||
|
||
|
||
async def extend_access_token(request: web.Request): | ||
return web.json_response( | ||
{ | ||
"status": 0, | ||
"responsetime": "2019-03-19T02:15:06.102Z", | ||
} | ||
) | ||
|
||
|
||
async def create_access_token_error(request: web.Request): | ||
return web.json_response( | ||
{ | ||
"status": 5, | ||
"messages": [ | ||
{ | ||
"message_code": "ERR-5201", | ||
"message_string": "MAINTENANCE. Please wait for a while", | ||
} | ||
], | ||
} | ||
) | ||
|
||
|
||
async def extend_access_token_error(request: web.Request): | ||
return web.json_response( | ||
{ | ||
"status": 1, | ||
"messages": [ | ||
{ | ||
"message_code": "ERR-5106", | ||
"message_string": "Invalid request parameter.", | ||
} | ||
], | ||
"responsetime": "2024-05-11T02:02:40.501Z", | ||
} | ||
) | ||
|
||
|
||
@pytest.mark.asyncio | ||
@pytest.mark.parametrize( | ||
"base_url, extend_access_handler, create_access_handler, expected_token", | ||
[ | ||
( | ||
"", | ||
extend_access_token, | ||
create_access_token, | ||
"aaaaaaaaaa", | ||
), | ||
( | ||
"https://api.coin.z.com", | ||
extend_access_token, | ||
create_access_token, | ||
"aaaaaaaaaa", | ||
), | ||
( | ||
"https://api.coin.z.com", | ||
extend_access_token_error, | ||
create_access_token, | ||
"xxxxxxxxxxxxxxxxxxxx", | ||
), | ||
( | ||
"https://api.coin.z.com", | ||
extend_access_token_error, | ||
create_access_token_error, | ||
"aaaaaaaaaa", | ||
), | ||
], | ||
) | ||
async def test_gmo_manage_ws_token( | ||
extend_access_handler: Callable[..., Awaitable[web.Response]], | ||
create_access_handler: Callable[..., Awaitable[web.Response]], | ||
pybotters_client: Callable[..., Awaitable[pybotters.Client]], | ||
base_url: str, | ||
expected_token: str, | ||
monkeypatch: pytest.MonkeyPatch, | ||
): | ||
async def sleep_canceller(delay): | ||
raise asyncio.CancelledError() | ||
|
||
app = web.Application() | ||
app.router.add_put("/private/v1/ws-auth", extend_access_handler) | ||
app.router.add_post("/private/v1/ws-auth", create_access_handler) | ||
client = await pybotters_client(app, base_url=base_url) | ||
|
||
token = "aaaaaaaaaa" | ||
url = f"wss://api.coin.z.com/ws/private/v1/{token}" | ||
m_ws = AsyncMock() | ||
m_ws.url = url | ||
|
||
helper = GMOCoinHelper(client) | ||
with monkeypatch.context() as m, suppress(asyncio.CancelledError): | ||
m.setattr(asyncio, asyncio.sleep.__name__, sleep_canceller) | ||
await helper.manage_ws_token( | ||
m_ws, | ||
token, | ||
300.0, | ||
) | ||
|
||
assert m_ws.url == f"wss://api.coin.z.com/ws/private/v1/{expected_token}" |