Skip to content

Commit

Permalink
fix(remote runner): disable retries by default (#3155)
Browse files Browse the repository at this point in the history
* disable retries unless under feature flag

* fixes

* add aiohttp import

* Update src/bentoml/_internal/runner/runner_handle/remote.py

Co-authored-by: Sean Sheng <s3sheng@gmail.com>

Co-authored-by: Sean Sheng <s3sheng@gmail.com>
  • Loading branch information
sauyon and ssheng committed Oct 31, 2022
1 parent 607cca2 commit 541b533
Showing 1 changed file with 27 additions and 17 deletions.
44 changes: 27 additions & 17 deletions src/bentoml/_internal/runner/runner_handle/remote.py
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import json
import pickle
import typing as t
Expand All @@ -21,6 +22,7 @@

if TYPE_CHECKING:
import yarl
import aiohttp
from aiohttp import BaseConnector
from aiohttp.client import ClientSession

Expand Down Expand Up @@ -126,10 +128,10 @@ def strip_query_params(url: yarl.URL) -> str:
)
return self._client_cache

def _reset_client(self):
async def _reset_client(self):
self._close_conn()
if self._client_cache is not None:
self._client_cache.close()
await self._client_cache.close()
self._client_cache = None

async def async_run_method(
Expand Down Expand Up @@ -168,21 +170,29 @@ async def async_run_method(
},
) as resp:
body = await resp.read()
except aiohttp.ClientOSError:
# most likely the TCP connection has been closed; retry after reconnecting
self._reset_client()
async with self._client.post(
f"{self._addr}/{path}",
data=pickle.dumps(payload_params), # FIXME: pickle inside pickle
headers={
"Bento-Name": component_context.bento_name,
"Bento-Version": component_context.bento_version,
"Runner-Name": self._runner.name,
"Yatai-Bento-Deployment-Name": component_context.yatai_bento_deployment_name,
"Yatai-Bento-Deployment-Namespace": component_context.yatai_bento_deployment_namespace,
},
) as resp:
body = await resp.read()
except aiohttp.ClientOSError as e:
if os.getenv("BENTOML_RETRY_RUNNER_REQUESTS").lower() == "true":
try:
# most likely the TCP connection has been closed; retry after reconnecting
await self._reset_client()
async with self._client.post(
f"{self._addr}/{path}",
data=pickle.dumps(
payload_params
), # FIXME: pickle inside pickle
headers={
"Bento-Name": component_context.bento_name,
"Bento-Version": component_context.bento_version,
"Runner-Name": self._runner.name,
"Yatai-Bento-Deployment-Name": component_context.yatai_bento_deployment_name,
"Yatai-Bento-Deployment-Namespace": component_context.yatai_bento_deployment_namespace,
},
) as resp:
body = await resp.read()
except aiohttp.ClientOSError as e:
raise RemoteException(f"Failed to connect to runner server.")
else:
raise RemoteException(f"Failed to connect to runner server.") from e

try:
content_type = resp.headers["Content-Type"]
Expand Down

0 comments on commit 541b533

Please sign in to comment.