Skip to content

Commit

Permalink
Merge pull request Nextdoor#53 from dickinsonm/separate-retry-failure
Browse files Browse the repository at this point in the history
Separate task failure from retry and backoff delay
  • Loading branch information
RKTMN committed Apr 16, 2020
2 parents 75293ea + a13e4ba commit 8780846
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 30 deletions.
10 changes: 9 additions & 1 deletion kale/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(self,
task_id=None,
payload=None,
current_retry_num=None,
current_failure_num=None,
enqueued_time=None,
publisher_data=None,
instantiate_task=False,
Expand All @@ -47,6 +48,8 @@ def __init__(self,
:param payload: Payload holds the data that the task's run_task method will be called with.
:param current_retry_num: Current task retry. This will be 0 from new tasks and will be
incremented for each retry.
:param current_failure_num: Current task failure. This will be 0 from new tasks and will be
incremented for each failure.
:param enqueued_time: Timestamp of when message was queued. If not provided then value set
from setting's timestamp function.
:param publisher_data: Str containing information about the publisher. If not provided the
Expand All @@ -58,6 +61,7 @@ def __init__(self,

self._validate_task_payload(payload)
retry_count = current_retry_num or 0
failure_count = current_failure_num or 0

self.id = sqs_message_id
self.sqs_queue_name = sqs_queue_name
Expand All @@ -75,6 +79,7 @@ def __init__(self,
self.task_kwargs = payload.get('kwargs')
self.task_app_data = payload.get('app_data')
self.task_retry_num = retry_count
self.task_failure_num = failure_count
self._enqueued_time = enqueued_time or _get_current_timestamp()
self._publisher_data = publisher_data or _get_publisher_data()

Expand Down Expand Up @@ -116,6 +121,7 @@ def _get_message_body(self):
'_enqueued_time': self._enqueued_time,
'_publisher': self._publisher_data,
'retry_num': self.task_retry_num,
'failure_num': self.task_failure_num,
}
return message_body

Expand Down Expand Up @@ -162,6 +168,7 @@ def decode_sqs(cls, sqs_message):
enqueued_time=message_body.get('_enqueued_time'),
publisher_data=message_body.get('_publisher'),
current_retry_num=message_body.get('retry_num'),
current_failure_num=message_body.get('failure_num'),
instantiate_task=True,
delete_func=sqs_message.delete
)
Expand All @@ -187,7 +194,8 @@ def decode_str(cls, message_str):
payload=message_body.get('payload'),
enqueued_time=message_body.get('_enqueued_time'),
publisher_data=message_body.get('_publisher'),
current_retry_num=message_body.get('retry_num')
current_retry_num=message_body.get('retry_num'),
current_failure_num=message_body.get('failure_num')
)

return msg
Expand Down
6 changes: 4 additions & 2 deletions kale/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ class Publisher(sqs.SQSTalk):
"""Class to manage publishing SQS tasks."""

def publish(self, task_class, task_id, payload,
current_retry_num=None, delay_sec=None):
current_retry_num=None, current_failure_num=None, delay_sec=None):
"""Publish the given task type to the queue with the provided payload.
:param obj task_class: class of the task that we are publishing.
:param str task_id: unique identifying string for this task.
:param dict payload: dictionary for the task payload.
:param int current_retry_num: current task retry count. If 0, this is
the first attempt to run the task.
:param int current_failure_num: current task failure count.
:param int delay_sec: time (in seconds) that a task should stay
in the queue before being released to consumers.
:raises: TaskTooChubbyException: This task is outrageously chubby.
Expand Down Expand Up @@ -53,7 +54,8 @@ def publish(self, task_class, task_id, payload,
task_class=task_class,
task_id=task_id,
payload=payload,
current_retry_num=current_retry_num)
current_retry_num=current_retry_num,
current_failure_num=current_failure_num)

sqs_queue.send_message(
MessageBody=kale_msg.encode(),
Expand Down
17 changes: 9 additions & 8 deletions kale/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def publish(cls, app_data, *args, **kwargs):

@classmethod
def _get_delay_sec_for_retry(cls, current_retry_num):
"""Generate a delay based on the number of times the task has failed.
"""Generate a delay based on the number of times the task has retried.
:param int current_retry_num: Task retry count for the task that is
about to be published.
Expand All @@ -124,14 +124,14 @@ def _get_delay_sec_for_retry(cls, current_retry_num):
settings.SQS_MAX_TASK_DELAY_SEC)

@classmethod
def handle_failure(cls, message, raised_exception, increment_retry_num=True):
def handle_failure(cls, message, raised_exception, increment_failure_num=True):
"""Logic to respond to task failure.
:param KaleMessage message: instance of KaleMessage containing the
task that failed.
:param Exception raised_exception: exception that the failed task
raised.
:param increment_retry_num: boolean whether the failure should increment
:param increment_failure_num: boolean whether the failure should increment
the retry count.
:return: True if the task will be retried, False otherwise.
:rtype: boolean
Expand All @@ -158,7 +158,7 @@ def handle_failure(cls, message, raised_exception, increment_retry_num=True):
return False

# Monitor retries and dropped tasks
if message.task_retry_num >= cls.max_retries:
if message.task_failure_num >= cls.max_retries:
cls._report_permanent_failure(
message, raised_exception,
PERMANENT_FAILURE_RETRIES_EXCEEDED, False)
Expand All @@ -169,14 +169,15 @@ def handle_failure(cls, message, raised_exception, increment_retry_num=True):
'kwargs': message.task_kwargs,
'app_data': message.task_app_data}

retry_count = message.task_retry_num
if increment_retry_num:
retry_count = retry_count + 1
retry_count = message.task_retry_num + 1
failure_count = message.task_failure_num
if increment_failure_num:
failure_count += 1
delay_sec = cls._get_delay_sec_for_retry(message.task_retry_num)
pub = cls._get_publisher()
pub.publish(
cls, message.task_id, payload,
current_retry_num=retry_count, delay_sec=delay_sec)
current_retry_num=retry_count, current_failure_num=failure_count, delay_sec=delay_sec)
return True

def run(self, *args, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion kale/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class FailTaskNoRetries(FailTask):
class MockMessage(message.KaleMessage):

def __init__(self, task_inst, task_args=None, task_kwargs=None, app_data=None,
retry_num=0):
retry_num=0, failure_num=0):
"""Instantiate a mock KaleMessage.
Args:
Expand All @@ -74,6 +74,7 @@ def __init__(self, task_inst, task_args=None, task_kwargs=None, app_data=None,
self.task_kwargs = task_kwargs or {}
self.task_app_data = app_data or {}
self.task_retry_num = retry_num
self.task_failure_num = failure_num
self.task_inst = task_inst


Expand Down
2 changes: 2 additions & 0 deletions kale/tests/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def test_message(self, mock_get_current_timestamp):
self.assertEqual('kale.task.Task', kale_msg.task_name)
self.assertEqual(123, kale_msg._enqueued_time)
self.assertEqual(0, kale_msg.task_retry_num)
self.assertEqual(0, kale_msg.task_failure_num)
self.assertEqual(1, kale_msg.task_id)
self.assertEqual([], kale_msg.task_args)
self.assertEqual({}, kale_msg.task_kwargs)
Expand Down Expand Up @@ -109,6 +110,7 @@ def test_decode(self):
self.assertEqual('kale.task.Task', kale_msg.task_name)
self.assertEqual(123, kale_msg._enqueued_time)
self.assertEqual(0, kale_msg.task_retry_num)
self.assertEqual(0, kale_msg.task_failure_num)
self.assertEqual(1, kale_msg.task_id)
self.assertEqual([], kale_msg.task_args)
self.assertEqual({}, kale_msg.task_kwargs)
Expand Down
47 changes: 39 additions & 8 deletions kale/tests/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def testTaskRetriesExceeded(self):

task_inst = test_utils.new_mock_task(task_class=test_utils.FailTask)
message = test_utils.MockMessage(
task_inst, retry_num=test_utils.FailTask.max_retries)
task_inst, failure_num=test_utils.FailTask.max_retries)

with mock.patch(
'kale.task.Task._report_permanent_failure') as fail_func:
Expand All @@ -131,7 +131,7 @@ def testTaskRetries(self):

task_inst = test_utils.new_mock_task(task_class=test_utils.FailTask)
message = test_utils.MockMessage(
task_inst, retry_num=test_utils.FailTask.max_retries)
task_inst, failure_num=test_utils.FailTask.max_retries)

with mock.patch(
'kale.task.Task._report_permanent_failure') as fail_func:
Expand All @@ -141,8 +141,8 @@ def testTaskRetries(self):
fail_func.assert_called_once_with(
message, exc, task.PERMANENT_FAILURE_RETRIES_EXCEEDED, False)

def testTaskRuntimeExceeded(self):
"""Task task failing from timeout."""
def testTaskRetryDelayWithoutFailure(self):
"""Task task failing with delay without failure"""

task_inst = test_utils.new_mock_task(task_class=test_utils.FailTask)
sample_values = [
Expand All @@ -159,17 +159,48 @@ def testTaskRuntimeExceeded(self):
message = test_utils.MockMessage(task_inst, retry_num=retry)

retried = test_utils.FailTask.handle_failure(
message, exceptions.TaskException('Exception'))
message, exceptions.TaskException('Exception'), increment_failure_num=False)
self.assertTrue(retried)
publish_func.assert_called_once_with(
test_utils.FailTask, message.task_id, payload,
current_retry_num=(retry + 1), delay_sec=delay_sec)
current_failure_num=0, current_retry_num=(retry + 1),
delay_sec=delay_sec)

def testTaskRetryDelayWithFailure(self):
"""Task task retrying with delay with failure"""

task_inst = test_utils.new_mock_task(task_class=test_utils.FailTask)
sample_values = [
(i, test_utils.FailTask._get_delay_sec_for_retry(i)) for i in
range(task_inst.max_retries)]
payload = {
'args': [],
'kwargs': {},
'app_data': {}}

for failure, delay_sec in sample_values:
with mock.patch(
'kale.publisher.Publisher.publish') as publish_func:
message = test_utils.MockMessage(task_inst, failure_num=failure, retry_num=failure)

retried = test_utils.FailTask.handle_failure(
message, exceptions.TaskException('Exception'), increment_failure_num=True)
self.assertTrue(retried)
publish_func.assert_called_once_with(
test_utils.FailTask, message.task_id, payload,
current_failure_num=(failure + 1), current_retry_num=(failure + 1),
delay_sec=delay_sec)

def testTaskRuntimeExceeded(self):
"""Task task failing from timeout."""

task_inst = test_utils.new_mock_task(task_class=test_utils.FailTask)

retry = retry + 1
with mock.patch(
'kale.task.Task._report_permanent_failure') as fail_func:
exc = exceptions.TaskException('Exception')
message = test_utils.MockMessage(task_inst, retry_num=retry)
message = test_utils.MockMessage(task_inst, retry_num=0,
failure_num=task_inst.max_retries + 1)
retried = test_utils.FailTask.handle_failure(message, exc)
self.assertFalse(retried)
fail_func.assert_called_once_with(
Expand Down
2 changes: 1 addition & 1 deletion kale/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = '1.0.2' # http://semver.org/
__version__ = '2.0.0' # http://semver.org/
21 changes: 12 additions & 9 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,21 @@ def maybe_rm(path):
keywords='kale nextdoor taskworker sqs python',
packages=['kale'],
tests_require=[
'mock',
'nose',
'moto'
'mock==2.0.0',
'nose==1.3.7',
'moto==1.3.15.dev122',
'MarkupSafe==1.1.1',
'Jinja2==2.10.3',
'zipp==0.6.0',
],
test_suite='nose.collector',
install_requires=[
'boto3',
'pycryptodome',
'pyyaml',
'setuptools',
'six',
'future',
'boto3==1.10.36',
'pycryptodome==3.6.1',
'pyyaml==5.2',
'setuptools==40.8.0',
'six==1.11.0',
'future==0.18.2',
],
classifiers=classifiers,
cmdclass={'clean': CleanHook},
Expand Down

0 comments on commit 8780846

Please sign in to comment.