Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto expiry for DynamoDB backend #5805

Merged
merged 13 commits into from Nov 25, 2019
182 changes: 180 additions & 2 deletions celery/backends/dynamodb.py
Expand Up @@ -51,9 +51,16 @@ class DynamoDBBackend(KeyValueStoreBackend):
#: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`)
endpoint_url = None

#: Item time-to-live in seconds (`default`)
time_to_live_seconds = None

# DynamoDB supports Time to Live as an auto-expiry mechanism.
supports_autoexpire = True

_key_field = DynamoDBAttribute(name='id', data_type='S')
_value_field = DynamoDBAttribute(name='result', data_type='B')
_timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N')
_ttl_field = DynamoDBAttribute(name='ttl', data_type='N')
georgepsarakis marked this conversation as resolved.
Show resolved Hide resolved
_available_fields = None

def __init__(self, url=None, table_name=None, *args, **kwargs):
Expand Down Expand Up @@ -118,6 +125,17 @@ def __init__(self, url=None, table_name=None, *args, **kwargs):
self.write_capacity_units
)
)

try:
thedrow marked this conversation as resolved.
Show resolved Hide resolved
self.time_to_live_seconds = int(
query.get(
'ttl_seconds',
self.time_to_live_seconds
)
)
except Exception:
pass

self.table_name = table or self.table_name

self._available_fields = (
Expand Down Expand Up @@ -153,6 +171,7 @@ def _get_client(self, access_key_id=None, secret_access_key=None):
**client_parameters
)
self._get_or_create_table()
self._set_table_ttl()
return self._client

def _get_table_schema(self):
Expand Down Expand Up @@ -206,6 +225,156 @@ def _get_or_create_table(self):
else:
raise e

def _has_ttl(self):
"""Return the desired Time to Live config; True means enabled."""
return self.time_to_live_seconds is not None

def _get_ttl_specification(self, ttl_attr_name):
"""Get the boto3 structure describing the DynamoDB TTL specification."""
return {
'TableName': self.table_name,
'TimeToLiveSpecification': {
'Enabled': self._has_ttl(),
'AttributeName': ttl_attr_name
}
}

def _set_table_ttl(self):
"""Enable or disable Time to Live on the table."""

# Verify that the client supports the TTL methods.
for method in ('update_time_to_live', 'describe_time_to_live'):
if not hasattr(self._client, method):
message = (
"boto3 method '{method}' not found; ensure that "
"boto3>=1.9.178 and botocore>=1.12.178 are installed"
).format(method=method)
if not self._has_ttl():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this part stand on its own as a guard? Even at debug level, the log can be a distraction when its not really used.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, maybe consider if moving the boto validation code elsewhere is nicer.

And then you could do:

self._validate_boto_version()

It would make the code more direct. This method verification is a necessary nuisance which isn't at the heart of this operation and has enough code to justify its own method.

(it would also be nice to see all missing methods and not just the first, but that is also not a big deal).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

# Return if Time to Live is not desired anyway.
logger.debug(message)
return
else:
# Raise exception if Time to Live should be enabled.
logger.error(message)
raise AttributeError(
"boto3 method '{}' not found".format(method)
)

# Get the current TTL description.
try:
description = self._client.describe_time_to_live(
TableName=self.table_name
)
status = description['TimeToLiveDescription']['TimeToLiveStatus']
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')
error_message = e.response['Error'].get('Message', 'Unknown')
logger.error((
'Error describing Time to Live on DynamoDB table {table}: '
'{code}: {message}'
).format(
table=self.table_name,
code=error_code,
message=error_message,
))
raise e
thedrow marked this conversation as resolved.
Show resolved Hide resolved

# Return early when possible.
if status in ('ENABLED', 'ENABLING'):
cur_attr_name = \
description['TimeToLiveDescription']['AttributeName']
if self._has_ttl():
if cur_attr_name == self._ttl_field.name:
# We want TTL enabled, and it is currently enabled or being
# enabled, and on the correct attribute.
logger.debug((
'DynamoDB Time to Live is {situation} '
'on table {table}'
).format(
situation='already enabled' \
if status == 'ENABLED' \
else 'currently being enabled',
table=self.table_name
))
return description

elif status in ('DISABLED', 'DISABLING'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elif -> if ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this one. I've left it as an if, elif, else now (added the else just now), which I think makes sense. Let me know if I should re-do it, though.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm no authority but elif usually makes me look at the previous condition. It satisfies the negation of previous condition and the following condition. I don't think its needed here (or anywhere where you have short-circuiting if ... return/raise))

IMO if is less interleaved than elif and easier to read.

But its fine either way, just wanted you to consider it.

if not self._has_ttl():
# We want TTL disabled, and it is currently disabled or being
# disabled.
logger.debug((
'DynamoDB Time to Live is {situation} '
'on table {table}'
).format(
situation='already disabled' \
if status == 'DISABLED' \
else 'currently being disabled',
table=self.table_name
))
return description

# At this point, we have one of the following situations:
#
# We want TTL enabled,
#
# - and it's currently disabled: Try to enable.
#
# - and it's being disabled: Try to enable, but this is almost sure to
# raise ValidationException with message:
#
# Time to live has been modified multiple times within a fixed
# interval
#
# - and it's currently enabling or being enabled, but on the wrong
# attribute: Try to enable, but this will raise ValidationException
# with message:
#
# TimeToLive is active on a different AttributeName: current
# AttributeName is ttlx
#
# We want TTL disabled,
#
# - and it's currently enabled: Try to disable.
#
# - and it's being enabled: Try to disable, but this is almost sure to
# raise ValidationException with message:
#
# Time to live has been modified multiple times within a fixed
# interval
#
attr_name = \
cur_attr_name if status == 'ENABLED' else self._ttl_field.name
try:
specification = self._client.update_time_to_live(
**self._get_ttl_specification(
ttl_attr_name=attr_name
)
)
logger.info(
(
'DynamoDB table Time to Live updated: '
'table={table} enabled={enabled} attribute={attr}'
).format(
table=self.table_name,
enabled=self._has_ttl(),
attr=self._ttl_field.name
)
)
return specification
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')
error_message = e.response['Error'].get('Message', 'Unknown')
logger.error((
'Error {action} Time to Live on DynamoDB table {table}: '
'{code}: {message}'
).format(
action='enabling' if self._has_ttl() else 'disabling',
table=self.table_name,
code=error_code,
message=error_message,
))
raise e

def _wait_for_table_status(self, expected='ACTIVE'):
"""Poll for the expected table status."""
achieved_state = False
Expand Down Expand Up @@ -236,7 +405,8 @@ def _prepare_get_request(self, key):

def _prepare_put_request(self, key, value):
"""Construct the item creation request parameters."""
return {
timestamp = time()
put_request = {
'TableName': self.table_name,
'Item': {
self._key_field.name: {
Expand All @@ -246,10 +416,18 @@ def _prepare_put_request(self, key, value):
self._value_field.data_type: value
},
self._timestamp_field.name: {
self._timestamp_field.data_type: str(time())
self._timestamp_field.data_type: str(timestamp)
}
}
}
if self._has_ttl():
put_request['Item'].update({
self._ttl_field.name: {
self._ttl_field.data_type:
str(int(timestamp + self.time_to_live_seconds))
}
})
return put_request

def _item_to_dict(self, raw_response):
"""Convert get_item() response to field-value pairs."""
Expand Down
11 changes: 11 additions & 0 deletions docs/userguide/configuration.rst
Expand Up @@ -1572,6 +1572,17 @@ The fields of the DynamoDB URL in ``result_backend`` are defined as follows:
The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write.
More details can be found in the `Provisioned Throughput documentation <http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html>`_.

#. ``ttl_seconds``
thedrow marked this conversation as resolved.
Show resolved Hide resolved

Time-to-live (in seconds) for results before they expire. Default is
disabled, meaning results are not expired. The DynamoDB table must have
Time to Live enabled for expiry to work. Time to Live will be enabled on
the table if ``ttl_seconds`` is set; otherwise, Time to Live will be
disabled. Note that trying to change a table's Time to Live setting
multiple times in quick succession will cause a throttling error. More
details can be found in the
`DynamoDB TTL documentation <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html>`_

.. _conf-ironcache-result-backend:

IronCache backend settings
Expand Down
2 changes: 1 addition & 1 deletion requirements/extras/dynamodb.txt
@@ -1 +1 @@
boto3>=1.9.125
boto3>=1.9.178