Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow retries for statuses other than 429 in bulk streaming #2071

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 25 additions & 10 deletions elasticsearch/_async/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,12 @@ async def azip(
pass


def _retry_for_status(status: int) -> bool:
if status == 429:
return True
return False


async def async_streaming_bulk(
client: AsyncElasticsearch,
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
Expand All @@ -167,6 +173,7 @@ async def async_streaming_bulk(
expand_action_callback: Callable[
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
] = expand_action,
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
raise_on_exception: bool = True,
max_retries: int = 0,
initial_backoff: float = 2,
Expand All @@ -185,10 +192,11 @@ async def async_streaming_bulk(
entire input is consumed and sent.

If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
asyncio.sleep**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
configure which status codes will be retried. To do this it will wait
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.

:arg client: instance of :class:`~elasticsearch.AsyncElasticsearch` to use
:arg actions: iterable or async iterable containing the actions to be executed
Expand All @@ -201,8 +209,12 @@ async def async_streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_for_status_callback: callback executed on each item's status,
should return a True if the status require a retry and False if not.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_for_status_callback (defaulting to ``429``) is received,
set to 0 (default) for no retries on retry_for_status_callback
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand Down Expand Up @@ -267,11 +279,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and retry_for_status_callback is true (defaulting to 429)
if (
max_retries
and info["status"] == 429
and retry_for_status_callback(info["status"])
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects strings so we need to
Expand All @@ -284,8 +296,11 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status which retry_for_status_callback is true (defaulting to 429)
# since we will retry them
if attempt == max_retries or not retry_for_status_callback(
e.status_code
):
raise
else:
if not to_retry:
Expand Down
35 changes: 25 additions & 10 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,12 @@ def _process_bulk_chunk(
yield from gen


def _retry_for_status(status: int) -> bool:
if status == 429:
return True
return False


def streaming_bulk(
client: Elasticsearch,
actions: Iterable[_TYPE_BULK_ACTION],
Expand All @@ -364,6 +370,7 @@ def streaming_bulk(
expand_action_callback: Callable[
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
] = expand_action,
retry_for_status_callback: Callable[[int], bool] = _retry_for_status,
raise_on_exception: bool = True,
max_retries: int = 0,
initial_backoff: float = 2,
Expand All @@ -382,10 +389,11 @@ def streaming_bulk(
entire input is consumed and sent.

If you specify ``max_retries`` it will also retry any documents that were
rejected with a ``429`` status code. To do this it will wait (**by calling
time.sleep which will block**) for ``initial_backoff`` seconds and then,
every subsequent rejection for the same chunk, for double the time every
time up to ``max_backoff`` seconds.
rejected with a ``429`` status code. Use ``retry_for_status_callback`` to
configure which status codes will be retried. To do this it will wait
(**by calling time.sleep which will block**) for ``initial_backoff`` seconds
and then, every subsequent rejection for the same chunk, for double the time
every time up to ``max_backoff`` seconds.

:arg client: instance of :class:`~elasticsearch.Elasticsearch` to use
:arg actions: iterable containing the actions to be executed
Expand All @@ -398,8 +406,12 @@ def streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_for_status_callback: callback executed on each item's status,
should return a True if the status require a retry and False if not.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
retry_for_status_callback (defaulting to ``429``) is received,
set to 0 (default) for no retries on retry_for_status_callback
:arg initial_backoff: number of seconds we should wait before the first
retry. Any subsequent retries will be powers of ``initial_backoff *
2**retry_number``
Expand Down Expand Up @@ -451,11 +463,11 @@ def streaming_bulk(

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and retry_for_status_callback is true (defaulting to 429)
if (
max_retries
and info["status"] == 429
and retry_for_status_callback(info["status"])
and (attempt + 1) <= max_retries
):
# _process_bulk_chunk expects bytes so we need to
Expand All @@ -468,8 +480,11 @@ def streaming_bulk(
yield ok, info

except ApiError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress any status which retry_for_status_callback is true (defaulting to 429)
# since we will retry them
if attempt == max_retries or not retry_for_status_callback(
e.status_code
):
raise
else:
if not to_retry:
Expand Down
44 changes: 44 additions & 0 deletions test_elasticsearch/test_async/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,50 @@ async def streaming_bulk():
await streaming_bulk()
assert 4 == failing_client._called

async def test_connection_timeout_is_retried_with_retry_status_callback(
self, async_client
):
failing_client = FailingBulkClient(
async_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

def _retry_for_connection_timeout(status):
if status == 522:
return True
return False

results = [
x
async for x in helpers.async_streaming_bulk(
failing_client,
docs,
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_for_status_callback=_retry_for_connection_timeout,
max_retries=1,
initial_backoff=0,
)
]
assert 3 == len(results)
assert [True, True, True] == [r[0] for r in results]
await async_client.indices.refresh(index="i")
res = await async_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


class TestBulk(object):
async def test_bulk_works_with_single_item(self, async_client):
Expand Down
44 changes: 44 additions & 0 deletions test_elasticsearch/test_server/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,50 @@ def streaming_bulk():
assert 4 == failing_client._called


def test_connection_timeout_is_retried_with_retry_status_callback(sync_client):
failing_client = FailingBulkClient(
sync_client,
fail_with=ApiError(
message="Connection timed out!",
body={},
meta=ApiResponseMeta(
status=522, headers={}, http_version="1.1", duration=0, node=None
),
),
)
docs = [
{"_index": "i", "_id": 47, "f": "v"},
{"_index": "i", "_id": 45, "f": "v"},
{"_index": "i", "_id": 42, "f": "v"},
]

def _retry_for_connection_timeout(status):
if status == 522:
return True
return False

results = list(
helpers.streaming_bulk(
failing_client,
docs,
index="i",
raise_on_exception=False,
raise_on_error=False,
chunk_size=1,
retry_for_status_callback=_retry_for_connection_timeout,
max_retries=1,
initial_backoff=0,
)
)
assert 3 == len(results)
print(results)
assert [True, True, True] == [r[0] for r in results]
sync_client.indices.refresh(index="i")
res = sync_client.search(index="i")
assert {"value": 3, "relation": "eq"} == res["hits"]["total"]
assert 4 == failing_client._called


def test_bulk_works_with_single_item(sync_client):
docs = [{"answer": 42, "_id": 1}]
success, failed = helpers.bulk(sync_client, docs, index="test-index", refresh=True)
Expand Down