Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add auto expiry for DynamoDB backend #5805

Merged
merged 13 commits into from Nov 25, 2019
52 changes: 50 additions & 2 deletions celery/backends/dynamodb.py
Expand Up @@ -51,9 +51,16 @@ class DynamoDBBackend(KeyValueStoreBackend):
#: The endpoint URL that is passed to boto3 (local DynamoDB) (`default`)
endpoint_url = None

#: Item time-to-live in seconds (`default`)
time_to_live_seconds = None

# DynamoDB supports Time to Live as an auto-expiry mechanism.
supports_autoexpire = True

_key_field = DynamoDBAttribute(name='id', data_type='S')
_value_field = DynamoDBAttribute(name='result', data_type='B')
_timestamp_field = DynamoDBAttribute(name='timestamp', data_type='N')
_ttl_field = DynamoDBAttribute(name='ttl', data_type='N')
georgepsarakis marked this conversation as resolved.
Show resolved Hide resolved
_available_fields = None

def __init__(self, url=None, table_name=None, *args, **kwargs):
Expand Down Expand Up @@ -118,6 +125,17 @@ def __init__(self, url=None, table_name=None, *args, **kwargs):
self.write_capacity_units
)
)

try:
thedrow marked this conversation as resolved.
Show resolved Hide resolved
self.time_to_live_seconds = int(
query.get(
'ttl_seconds',
self.time_to_live_seconds
)
)
except Exception:
pass

self.table_name = table or self.table_name

self._available_fields = (
Expand Down Expand Up @@ -177,6 +195,16 @@ def _get_table_schema(self):
}
}

def _get_ttl_specification(self):
"""Get the boto3 structure describing the DynamoDB TTL specification."""
return {
'TableName': self.table_name,
'TimeToLiveSpecification': {
'Enabled': True,
'AttributeName': self._ttl_field.name
}
}

def _get_or_create_table(self):
"""Create table if not exists, otherwise return the description."""
table_schema = self._get_table_schema()
Expand All @@ -194,6 +222,17 @@ def _get_or_create_table(self):
self.table_name
)
)
# Enable time-to-live on the table, ignoring whether or not TTL is
# currently specified as an option for the backend. This allows
# enabling it later. Until then, items are inserted without a value
# in the ttl field, meaning that DynamoDB will never expire them.
self._client.update_time_to_live(**self._get_ttl_specification())
thedrow marked this conversation as resolved.
Show resolved Hide resolved
thedrow marked this conversation as resolved.
Show resolved Hide resolved
logger.info(
'DynamoDB Table {} time-to-live enabled on field {}.'.format(
self.table_name,
self._ttl_field.name
)
)
return table_description
except ClientError as e:
error_code = e.response['Error'].get('Code', 'Unknown')
Expand Down Expand Up @@ -236,7 +275,8 @@ def _prepare_get_request(self, key):

def _prepare_put_request(self, key, value):
"""Construct the item creation request parameters."""
return {
timestamp = time()
put_request = {
'TableName': self.table_name,
'Item': {
self._key_field.name: {
Expand All @@ -246,10 +286,18 @@ def _prepare_put_request(self, key, value):
self._value_field.data_type: value
},
self._timestamp_field.name: {
self._timestamp_field.data_type: str(time())
self._timestamp_field.data_type: str(timestamp)
}
}
}
if self.time_to_live_seconds is not None:
put_request['Item'].update({
self._ttl_field.name: {
self._ttl_field.data_type:
str(int(timestamp + self.time_to_live_seconds))
}
})
return put_request

def _item_to_dict(self, raw_response):
"""Convert get_item() response to field-value pairs."""
Expand Down
8 changes: 8 additions & 0 deletions docs/userguide/configuration.rst
Expand Up @@ -1572,6 +1572,14 @@ The fields of the DynamoDB URL in ``result_backend`` are defined as follows:
The Read & Write Capacity Units for the created DynamoDB table. Default is ``1`` for both read and write.
More details can be found in the `Provisioned Throughput documentation <http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html>`_.

#. ``ttl_seconds``
thedrow marked this conversation as resolved.
Show resolved Hide resolved

Time-to-live (in seconds) for results before they expire. Default is
disabled, meaning results are not expired. The DynamoDB table must have
Time to Live enabled for expiry to work. If the table doesn't exist, it is
created with Time to Live enabled. More details can be found in the
`DynamoDB TTL documentation <https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html>`_

.. _conf-ironcache-result-backend:

IronCache backend settings
Expand Down
2 changes: 1 addition & 1 deletion requirements/extras/dynamodb.txt
@@ -1 +1 @@
boto3>=1.9.125
boto3>=1.9.178
57 changes: 55 additions & 2 deletions t/unit/backends/test_dynamodb.py
Expand Up @@ -100,6 +100,8 @@ def test_get_or_create_table_not_exists(self):
mock_create_table = self.backend._client.create_table = MagicMock()
mock_describe_table = self.backend._client.describe_table = \
MagicMock()
mock_update_time_to_live = self.backend._client.update_time_to_live = \
MagicMock()

mock_describe_table.return_value = {
'Table': {
Expand All @@ -111,6 +113,9 @@ def test_get_or_create_table_not_exists(self):
mock_create_table.assert_called_once_with(
**self.backend._get_table_schema()
)
mock_update_time_to_live.assert_called_once_with(
**self.backend._get_ttl_specification()
)

def test_get_or_create_table_already_exists(self):
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -180,6 +185,25 @@ def test_prepare_put_request(self):
result = self.backend._prepare_put_request('abcdef', 'val')
assert result == expected

def test_prepare_put_request_with_ttl(self):
ttl = self.backend.time_to_live_seconds = 30
expected = {
'TableName': u'celery',
'Item': {
u'id': {u'S': u'abcdef'},
u'result': {u'B': u'val'},
u'timestamp': {
u'N': str(Decimal(self._static_timestamp))
},
u'ttl': {
u'N': str(int(self._static_timestamp + ttl))
}
}
}
with patch('celery.backends.dynamodb.time', self._mock_time):
result = self.backend._prepare_put_request('abcdef', 'val')
assert result == expected

def test_item_to_dict(self):
boto_response = {
'Item': {
Expand Down Expand Up @@ -236,6 +260,30 @@ def test_set(self):
assert call_kwargs['Item'] == expected_kwargs['Item']
assert call_kwargs['TableName'] == 'celery'

def test_set_with_ttl(self):
ttl = self.backend.time_to_live_seconds = 30

self.backend._client = MagicMock()
self.backend._client.put_item = MagicMock()

# should return None
with patch('celery.backends.dynamodb.time', self._mock_time):
assert self.backend.set(sentinel.key, sentinel.value) is None

assert self.backend._client.put_item.call_count == 1
_, call_kwargs = self.backend._client.put_item.call_args
expected_kwargs = {
'Item': {
u'timestamp': {u'N': str(self._static_timestamp)},
u'id': {u'S': string(sentinel.key)},
u'result': {u'B': sentinel.value},
u'ttl': {u'N': str(int(self._static_timestamp + ttl))},
},
'TableName': 'celery'
}
assert call_kwargs['Item'] == expected_kwargs['Item']
assert call_kwargs['TableName'] == 'celery'

def test_delete(self):
self.backend._client = Mock(name='_client')
mocked_delete = self.backend._client.delete = Mock('client.delete')
Expand All @@ -255,10 +303,15 @@ def test_backend_by_url(self, url='dynamodb://'):
assert url_ == url

def test_backend_params_by_url(self):
self.app.conf.result_backend = \
'dynamodb://@us-east-1/celery_results?read=10&write=20'
self.app.conf.result_backend = (
'dynamodb://@us-east-1/celery_results'
'?read=10'
'&write=20'
'&ttl_seconds=600'
)
assert self.backend.aws_region == 'us-east-1'
assert self.backend.table_name == 'celery_results'
assert self.backend.read_capacity_units == 10
assert self.backend.write_capacity_units == 20
assert self.backend.time_to_live_seconds == 600
assert self.backend.endpoint_url is None