Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ElasticSearch can't parse celery result correctly #5660

Closed
7 of 16 tasks
geekkun opened this issue Jul 26, 2019 · 17 comments
Closed
7 of 16 tasks

ElasticSearch can't parse celery result correctly #5660

geekkun opened this issue Jul 26, 2019 · 17 comments

Comments

@geekkun
Copy link

geekkun commented Jul 26, 2019

Checklist

  • I have read the relevant section in the
    contribution guide
    on reporting bugs.
  • I have checked the issues list
    for similar or identical bug reports.
  • I have checked the pull requests list
    for existing proposed fixes.
  • I have checked the commit log
    to find out if the bug was already fixed in the master branch.
  • I have included all related issues and possible duplicate issues
    in this issue (If there are none, check this box anyway).

Mandatory Debugging Information

  • I have included the output of celery -A proj report in the issue.
    (if you are not able to do this, then at least specify the Celery
    version affected).
  • I have verified that the issue exists against the master branch of Celery.
  • I have included the contents of pip freeze in the issue.
  • I have included all the versions of all the external dependencies required
    to reproduce this bug.

Optional Debugging Information

  • I have tried reproducing the issue on more than one Python version
    and/or implementation.
  • I have tried reproducing the issue on more than one message broker and/or
    result backend.
  • I have tried reproducing the issue on more than one version of the message
    broker and/or result backend.
  • I have tried reproducing the issue on more than one operating system.
  • I have tried reproducing the issue on more than one workers pool.
  • I have tried reproducing the issue with autoscaling, retries,
    ETA/Countdown & rate limits disabled.
  • I have tried reproducing the issue after downgrading
    and/or upgrading Celery and its dependencies.

Related Issues and Possible Duplicates

Related Issues

  • None

Possible Duplicates

  • None

Environment & Settings

Celery version: 4.3.0

celery report Output:

Steps to Reproduce

Add elasticsearch as backend
Run any command

Required Dependencies

  • Minimal Python Version: '3.7.1 (default, Nov 6 2018, 18:46:03) \n[Clang 10.0.0 (clang-1000.11.45.5)]'
  • Minimal Celery Version: 4.3.0
  • Minimal Kombu Version: N/A or Unknown
  • Minimal Broker Version: N/A or Unknown
  • Minimal Result Backend Version: N/A or Unknown
  • Minimal OS and/or Kernel Version: System Version: macOS 10.14.5 (18F132) Darwin 18.6.0
  • Minimal Broker Client Version: N/A or Unknown
  • Minimal Result Backend Client Version: Elasticsearch 7.0.0

Python Packages

pip freeze Output:

Other Dependencies

N/A

Minimally Reproducible Test Case

Expected Behavior

Result is sent as a dictionary. At body dictionary creation, result added as inner dictionary, not as a string. Correct mapping is created. Results in Elasticsearch will be shown correctly and nested type could be used.

Correct mapping example:

{
  "celery" : {
    "mappings" : {
      "backend" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "result" : {
            "properties" : {
              "date_done" : {
                "type" : "date"
              },
              "result" : {
                "type" : "long"
              },
              "status" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "task_id" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          }
        }
      }
    }
  }
}

Kibana:

Screenshot 2019-07-26 at 17 49 27

Actual Behavior

Result value is serialized into a JSON string. At body dictionary creation, result added as a string. Elasticsearch can't parse it correctly.

Actual mapping:

{
  "celery" : {
    "mappings" : {
      "backend" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "result" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          }
        }
      }
    }
  }
}

Kibana:
Screenshot 2019-07-24 at 20 30 41

My stackoverflow question

Solution

In my fork I've imported json package and added json.loads(value):

# celery/backends/elasticsearch.py
import json

def set(self, key, value):
        try:
            self._index(
                id=key,
                body={
                    'result': json.loads(value),
                    '@timestamp': '{0}Z'.format(
                        datetime.utcnow().isoformat()[:-3]
                    ),
                },
            )
        except elasticsearch.exceptions.ConflictError:
            # document already exists, update it
            data = self.get(key)
            data[key] = value
            self._index(key, data, refresh=True)

Is there a way to disable JSON serialization of results for the Elasticsearch backend? If no, is this solution good enough for the pull-request?

@safwanrahman
Copy link
Contributor

This depends on #5661

@thedrow
Copy link
Member

thedrow commented Aug 6, 2019

The proposed patch is not enough.
JSON is not the only serialization method for result backends.
The correct solution in my opinion is to create a serializer which does nothing and use that as the result serializer.
Can you try that?

@sover02
Copy link

sover02 commented Oct 2, 2019

Hey folks, just wanted shamelessly add a +1. My use case is the same, but I'm afraid I don't have the skillset to improve it in the way @thedrow mentioned myself.

@thedrow
Copy link
Member

thedrow commented Oct 13, 2019

Of course you do. See the documentation for details.
Essentially the serializer should be something similar to:

from kombu import serialization

serialization.register(
    'elasticsearch', lambda x: x, lambda x: x,
    content_type='application/x-elasticsearch',
)

Try using it and let me know if that resolves your issue.

@auvipy auvipy added this to the 4.4.x milestone Dec 16, 2019
@andyshinn
Copy link

I'm confirming that @thedrow example does indeed work for me. I registered the serializer before my Celery app object and then set result_serializer = "elasticsearch". Results are now structured properly in Elasticsearch.

Keep in mind, this is converting from a string to an object and Elasticsearch won't let you do that inthe same index. Better to version the index name to not clash on types.

@andyshinn
Copy link

Scratch that! There is a issue with this method. Unless you can guarantee the result field will always be the same type then this will hang up on inserting when types clash with error such as:

[2020-01-17 21:58:58,676: ERROR/MainProcess] Pool callback raised exception: RequestError(400, 'mapper_parsing_exception', {'error': {'root_cause': [{'type': 'mapper_parsing_exception', 'reason': "failed to parse field [result.result] of type [float] in document with id 'celery-task-meta-3df75830-a580-4aa2-a797-c7328b545159'"}], 'type': 'mapper_parsing_exception', 'reason': "failed to parse field [result.result] of type [float] in document with id 'celery-task-meta-3df75830-a580-4aa2-a797-c7328b545159'", 'caused_by': {'type': 'json_parse_exception', 'reason': 'Current token (START_OBJECT) not numeric, can not use numeric value accessors\n at [Source: org.elasticsearch.common.bytes.BytesReference$MarkSupportingStreamInputWrapper@793084b7; line: 1, column: 41]'}}, 'status': 400})
Traceback (most recent call last):
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/billiard/pool.py", line 1791, in safe_apply_callback
    fun(*args, **kwargs)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/worker/request.py", line 528, in on_failure
    store_result=self.store_errors,
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/backends/base.py", line 160, in mark_as_failure
    traceback=traceback, request=request)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/backends/base.py", line 407, in store_result
    request=request, **kwargs)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/backends/base.py", line 758, in _store_result
    self.set(self.get_key_for_task(task_id), self.encode(meta))
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/backends/elasticsearch.py", line 109, in set
    datetime.utcnow().isoformat()[:-3]
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/celery/backends/elasticsearch.py", line 126, in _index
    **kwargs
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/elasticsearch/client/utils.py", line 76, in _wrapped
    return func(*args, params=params, **kwargs)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/elasticsearch/client/__init__.py", line 300, in index
    _make_path(index, doc_type, id), params=params, body=body)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/elasticsearch/transport.py", line 314, in perform_request
    status, headers_response, data = connection.perform_request(method, url, params, body, headers=headers, ignore=ignore, timeout=timeout)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/elasticsearch/connection/http_urllib3.py", line 163, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/findmine/virtualenv/lib/python3.6/site-packages/elasticsearch/connection/base.py", line 125, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
elasticsearch.exceptions.RequestError: TransportError(400, 'mapper_parsing_exception', "failed to parse field [result.result] of type [float] in document with id 'celery-task-meta-3df75830-a580-4aa2-a797-c7328b545159'")

The result itself needs to be serialized in some way it seems (maybe the result is JSON but everything else can be native fields).

@georgepsarakis
Copy link
Contributor

@andyshinn due to the Elasticsearch's dynamic mapping feature, the mapping is created when a document that contains a field is indexed. You cannot alter the field type in subsequent indexing requests, since that would cause be a mapping incompatibility. I don't think we can find a generic solution for this issue, but it could be handled by your custom serializer instead.

@imadmoussa1
Copy link

Any updates regarding this issue?
I tried some solution but nothing working for me, I need the result saved as an object and not text to able to add visualizations in kibana.

@thedrow
Copy link
Member

thedrow commented Sep 2, 2020

None unfortunately.
PRs are welcome if you can come up with a good solution.

@safwanrahman
Copy link
Contributor

As #5661 is fixed, I think it is easier now to make a patch for this.

@safwanrahman
Copy link
Contributor

This should be fixed by #6141. You can now set elasticsearch_save_meta_as_text to False to save meta as json.

@safwanrahman
Copy link
Contributor

@auvipy you can close this issue! 👍

@auvipy auvipy modified the milestones: 4.4.x, Future Feb 16, 2021
@auvipy auvipy closed this as completed Feb 16, 2021
@auvipy auvipy modified the milestones: Future, 4.4.x Feb 16, 2021
@auvipy
Copy link
Member

auvipy commented Feb 16, 2021

thanks @safwanrahman

@thedrow
Copy link
Member

thedrow commented Feb 21, 2021

@auvipy that's the wrong milestone, isn't it?

@auvipy
Copy link
Member

auvipy commented Feb 21, 2021

@auvipy that's the wrong milestone, isn't it?

yes it should be on 4.4.0 which is closed

@auvipy auvipy removed this from the 4.4.x milestone Feb 21, 2021
@thedrow
Copy link
Member

thedrow commented Feb 21, 2021

Was this released on 4.4.0?

@auvipy
Copy link
Member

auvipy commented Feb 21, 2021

no sorry, I recheck and got https://github.com/celery/celery/blob/v4.5/Changelog.rst#444 so the 4.4.x milestone could be OK for the already released version I guess.

@auvipy auvipy added this to the 4.4.x milestone Feb 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants