Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #5436] Store extending result in all backends #5661

Merged
merged 8 commits into from Nov 29, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 51 additions & 34 deletions celery/backends/base.py
Expand Up @@ -351,6 +351,54 @@ def encode_result(self, result, state):
def is_cached(self, task_id):
return task_id in self._cache

def _get_result_meta(self, result,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

particularly this block and related changes tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy Do you mean writing new tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes some more test

state, traceback, request, format_date=True,
encode=False):
if state in self.READY_STATES:
date_done = datetime.datetime.utcnow()
if format_date:
date_done = date_done.isoformat()
else:
date_done = None

meta = {
'status': state,
'result': result,
'traceback': traceback,
'children': self.current_task_children(request),
'date_done': date_done,
}

if request and getattr(request, 'group', None):
meta['group_id'] = request.group
if request and getattr(request, 'parent_id', None):
meta['parent_id'] = request.parent_id

if self.app.conf.find_value_for_key('extended', 'result'):
if request:
request_meta = {
'name': getattr(request, 'task', None),
'args': getattr(request, 'args', None),
'kwargs': getattr(request, 'kwargs', None),
'worker': getattr(request, 'hostname', None),
'retries': getattr(request, 'retries', None),
'queue': request.delivery_info.get('routing_key')
if hasattr(request, 'delivery_info') and
request.delivery_info else None
}

if encode:
# args and kwargs need to be encoded properly before saving
encode_needed_fields = {"args", "kwargs"}
for field in encode_needed_fields:
value = request_meta[field]
encoded_value = self.encode(value)
request_meta[field] = ensure_bytes(encoded_value)

meta.update(request_meta)

return meta

def store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
"""Update task state and result."""
Expand Down Expand Up @@ -703,40 +751,9 @@ def _forget(self, task_id):

def _store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):

if state in self.READY_STATES:
date_done = datetime.datetime.utcnow().isoformat()
else:
date_done = None

meta = {
'status': state,
'result': result,
'traceback': traceback,
'children': self.current_task_children(request),
'task_id': bytes_to_str(task_id),
'date_done': date_done,
}

if request and getattr(request, 'group', None):
meta['group_id'] = request.group
if request and getattr(request, 'parent_id', None):
meta['parent_id'] = request.parent_id

if self.app.conf.find_value_for_key('extended', 'result'):
if request:
request_meta = {
'name': getattr(request, 'task', None),
'args': getattr(request, 'args', None),
'kwargs': getattr(request, 'kwargs', None),
'worker': getattr(request, 'hostname', None),
'retries': getattr(request, 'retries', None),
'queue': request.delivery_info.get('routing_key')
if hasattr(request, 'delivery_info') and
request.delivery_info else None
}

meta.update(request_meta)
meta = self._get_result_meta(result=result, state=state,
traceback=traceback, request=request)
meta['task_id'] = bytes_to_str(task_id)

self.set(self.get_key_for_task(task_id), self.encode(meta))
return result
Expand Down
36 changes: 17 additions & 19 deletions celery/backends/database/__init__.py
Expand Up @@ -5,7 +5,6 @@
import logging
from contextlib import contextmanager

from kombu.utils.encoding import ensure_bytes
from vine.utils import wraps

from celery import states
Expand Down Expand Up @@ -120,6 +119,7 @@ def _store_result(self, task_id, result, state, traceback=None,
task = task and task[0]
if not task:
task = self.task_cls(task_id)
task.task_id = task_id
session.add(task)
session.flush()

Expand All @@ -128,24 +128,22 @@ def _store_result(self, task_id, result, state, traceback=None,

def _update_result(self, task, result, state, traceback=None,
request=None):
task.result = result
task.status = state
task.traceback = traceback
if self.app.conf.find_value_for_key('extended', 'result'):
task.name = getattr(request, 'task', None)
task.args = ensure_bytes(
self.encode(getattr(request, 'args', None))
)
task.kwargs = ensure_bytes(
self.encode(getattr(request, 'kwargs', None))
)
task.worker = getattr(request, 'hostname', None)
task.retries = getattr(request, 'retries', None)
task.queue = (
request.delivery_info.get("routing_key")
if hasattr(request, "delivery_info") and request.delivery_info
else None
)

meta = self._get_result_meta(result=result, state=state,
traceback=traceback, request=request,
format_date=False, encode=True)

# Exclude the primary key id and task_id columns
# as we should not set it None
columns = [column.name for column in self.task_cls.__table__.columns
if column.name not in {'id', 'task_id'}]

# Iterate through the columns name of the table
# to set the value from meta.
# If the value is not present in meta, set None
for column in columns:
value = meta.get(column)
setattr(task, column, value)

@retry
def _get_task_meta_for(self, task_id):
Expand Down
16 changes: 4 additions & 12 deletions celery/backends/mongodb.py
Expand Up @@ -185,18 +185,10 @@ def decode(self, data):
def _store_result(self, task_id, result, state,
traceback=None, request=None, **kwargs):
"""Store return value and state of an executed task."""
meta = {
'_id': task_id,
'status': state,
'result': self.encode(result),
'date_done': datetime.utcnow(),
'traceback': self.encode(traceback),
'children': self.encode(
self.current_task_children(request),
),
}
if request and getattr(request, 'parent_id', None):
meta['parent_id'] = request.parent_id
meta = self._get_result_meta(result=result, state=state,
traceback=traceback, request=request)
# Add the _id for mongodb
meta['_id'] = task_id

try:
self.collection.replace_one({'_id': task_id}, meta, upsert=True)
Expand Down