Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto expiry for DynamoDB backend #5805

Merged
merged 13 commits into from Nov 25, 2019
224 changes: 222 additions & 2 deletions celery/backends/dynamodb.py
Expand Up @@ -51,9 +51,16 @@ class DynamoDBBackend(KeyValueStoreBackend):
#: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`)
endpoint_url = None

#: Item time-to-live in seconds (`default`)
time_to_live_seconds = None

# DynamoDB supports Time to Live as an auto-expiry mechanism.
supports_autoexpire = True

_key_field = DynamoDBAttribute(name='id', data_type='S')
_value_field = DynamoDBAttribute(name='result', data_type='B')
_timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N')
_ttl_field = DynamoDBAttribute(name='ttl', data_type='N')
georgepsarakis marked this conversation as resolved.
Show resolved Hide resolved
_available_fields = None

def __init__(self, url=None, table_name=None, *args, **kwargs):
Expand Down Expand Up @@ -118,6 +125,18 @@ def __init__(self, url=None, table_name=None, *args, **kwargs):
self.write_capacity_units
)
)

ttl = query.get('ttl_seconds', self.time_to_live_seconds)
if ttl:
try:
self.time_to_live_seconds = int(ttl)
except ValueError as e:
logger.error(
'TTL must be a number; got "{ttl}"',
exc_info=e
)
raise e

self.table_name = table or self.table_name

self._available_fields = (
Expand Down Expand Up @@ -153,6 +172,11 @@ def _get_client(self, access_key_id=None, secret_access_key=None):
**client_parameters
)
self._get_or_create_table()

if self._has_ttl() is not None:
self._validate_ttl_methods()
self._set_table_ttl()

return self._client

def _get_table_schema(self):
Expand Down Expand Up @@ -206,6 +230,193 @@ def _get_or_create_table(self):
else:
raise e

def _has_ttl(self):
"""Return the desired Time to Live config.

- True: Enable TTL on the table; use expiry.
- False: Disable TTL on the table; don't use expiry.
- None: Ignore TTL on thetable; don't use expiry.
"""

return None if self.time_to_live_seconds is None \
else self.time_to_live_seconds >= 0

def _validate_ttl_methods(self):
"""Verify boto support for the DynamoDB Time to Live methods."""

# Required TTL methods.
required_methods = (
'update_time_to_live',
'describe_time_to_live',
)

# Find missing methods.
missing_methods = []
for method in list(required_methods):
if not hasattr(self._client, method):
missing_methods.append(method)

if missing_methods:
logger.error(
(
'boto3 method(s) {methods} not found; ensure that '
'boto3>=1.9.178 and botocore>=1.12.178 are installed'
).format(
methods=','.join(missing_methods)
)
)
raise AttributeError(
'boto3 method(s) {methods} not found'.format(
methods=','.join(missing_methods)
)
)

def _get_ttl_specification(self, ttl_attr_name):
"""Get the boto3 structure describing the DynamoDB TTL specification."""

return {
'TableName': self.table_name,
'TimeToLiveSpecification': {
'Enabled': self._has_ttl(),
'AttributeName': ttl_attr_name
}
}

def _get_table_ttl_description(self):
# Get the current TTL description.
try:
description = self._client.describe_time_to_live(
TableName=self.table_name
)
status = description['TimeToLiveDescription']['TimeToLiveStatus']
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')
error_message = e.response['Error'].get('Message', 'Unknown')
logger.error((
'Error describing Time to Live on DynamoDB table {table}: '
'{code}: {message}'
).format(
table=self.table_name,
code=error_code,
message=error_message,
))
raise e

return description

def _set_table_ttl(self):
"""Enable or disable Time to Live on the table."""

# Get the table TTL description, and return early when possible.
description = self._get_table_ttl_description()
status = description['TimeToLiveDescription']['TimeToLiveStatus']
if status in ('ENABLED', 'ENABLING'):
cur_attr_name = \
description['TimeToLiveDescription']['AttributeName']
if self._has_ttl():
if cur_attr_name == self._ttl_field.name:
# We want TTL enabled, and it is currently enabled or being
# enabled, and on the correct attribute.
logger.debug((
'DynamoDB Time to Live is {situation} '
'on table {table}'
).format(
situation='already enabled' \
if status == 'ENABLED' \
else 'currently being enabled',
table=self.table_name
))
return description

elif status in ('DISABLED', 'DISABLING'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif -> if ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. I've left it as an if, elif, else now (added the else just now), which I think makes sense. Let me know if I should re-do it, though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no authority but elif usually makes me look at the previous condition. It satisfies the negation of previous condition and the following condition. I don't think its needed here (or anywhere where you have short-circuiting if ... return/raise))

IMO if is less interleaved than elif and easier to read.

But its fine either way, just wanted you to consider it.

if not self._has_ttl():
# We want TTL disabled, and it is currently disabled or being
# disabled.
logger.debug((
'DynamoDB Time to Live is {situation} '
'on table {table}'
).format(
situation='already disabled' \
if status == 'DISABLED' \
else 'currently being disabled',
table=self.table_name
))
return description

# The state shouldn't ever have any value beyond the four handled
# above, but to ease troubleshooting of potential future changes, emit
# a log showing the unknown state.
else: # pragma: no cover
logger.warning((
'Unknown DynamoDB Time to Live status {status} '
'on table {table}. Attempting to continue.'
).format(
status=status,
table=self.table_name
))

# At this point, we have one of the following situations:
#
# We want TTL enabled,
#
# - and it's currently disabled: Try to enable.
#
# - and it's being disabled: Try to enable, but this is almost sure to
# raise ValidationException with message:
#
# Time to live has been modified multiple times within a fixed
# interval
#
# - and it's currently enabling or being enabled, but on the wrong
# attribute: Try to enable, but this will raise ValidationException
# with message:
#
# TimeToLive is active on a different AttributeName: current
# AttributeName is ttlx
#
# We want TTL disabled,
#
# - and it's currently enabled: Try to disable.
#
# - and it's being enabled: Try to disable, but this is almost sure to
# raise ValidationException with message:
#
# Time to live has been modified multiple times within a fixed
# interval
#
attr_name = \
cur_attr_name if status == 'ENABLED' else self._ttl_field.name
try:
specification = self._client.update_time_to_live(
**self._get_ttl_specification(
ttl_attr_name=attr_name
)
)
logger.info(
(
'DynamoDB table Time to Live updated: '
'table={table} enabled={enabled} attribute={attr}'
).format(
table=self.table_name,
enabled=self._has_ttl(),
attr=self._ttl_field.name
)
)
return specification
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')
error_message = e.response['Error'].get('Message', 'Unknown')
logger.error((
'Error {action} Time to Live on DynamoDB table {table}: '
'{code}: {message}'
).format(
action='enabling' if self._has_ttl() else 'disabling',
table=self.table_name,
code=error_code,
message=error_message,
))
raise e

def _wait_for_table_status(self, expected='ACTIVE'):
"""Poll for the expected table status."""
achieved_state = False
Expand Down Expand Up @@ -236,7 +447,8 @@ def _prepare_get_request(self, key):

def _prepare_put_request(self, key, value):
"""Construct the item creation request parameters."""
return {
timestamp = time()
put_request = {
'TableName': self.table_name,
'Item': {
self._key_field.name: {
Expand All @@ -246,10 +458,18 @@ def _prepare_put_request(self, key, value):
self._value_field.data_type: value
},
self._timestamp_field.name: {
self._timestamp_field.data_type: str(time())
self._timestamp_field.data_type: str(timestamp)
}
}
}
if self._has_ttl():
put_request['Item'].update({
self._ttl_field.name: {
self._ttl_field.data_type:
str(int(timestamp + self.time_to_live_seconds))
}
})
return put_request

def _item_to_dict(self, raw_response):
"""Convert get_item() response to field-value pairs."""
Expand Down
12 changes: 12 additions & 0 deletions docs/userguide/configuration.rst
Expand Up @@ -1572,6 +1572,18 @@ The fields of the DynamoDB URL in ``result_backend`` are defined as follows:
The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write.
More details can be found in the `Provisioned Throughput documentation <http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html>`_.

#. ``ttl_seconds``
thedrow marked this conversation as resolved.
Show resolved Hide resolved

Time-to-live (in seconds) for results before they expire. The default is to
not expire results, while also leaving the DynamoDB table's Time to Live
settings untouched. If ``ttl_seconds`` is set to a positive value, results
will expire after the specified number of seconds. Setting ``ttl_seconds``
to a negative value means to not expire results, and also to actively
disable the DynamoDB table's Time to Live setting. Note that trying to
change a table's Time to Live setting multiple times in quick succession
will cause a throttling error. More details can be found in the
`DynamoDB TTL documentation <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html>`_

.. _conf-ironcache-result-backend:

IronCache backend settings
Expand Down
2 changes: 1 addition & 1 deletion requirements/extras/dynamodb.txt
@@ -1 +1 @@
boto3>=1.9.125
boto3>=1.9.178