Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add special exception for "host field is not hashable" #23136

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -19,6 +19,7 @@
import logging
import sys
from collections import defaultdict
from collections.abc import Hashable
from datetime import datetime
from operator import attrgetter
from time import time
Expand Down Expand Up @@ -153,7 +154,17 @@ def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
for log in logs:
key = getattr(log, self.host_field, 'default_host')
grouped_logs[key].append(log)

try:
grouped_logs[key].append(log)
except TypeError as e:
if not isinstance(key, Hashable):
raise ValueError("The host field in all log records needs to be hashable. "
"If you are using filebeat, read here: "
"https://github.com/apache/airflow/issues/15613#issuecomment-1104487752") from e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of linking to GitHub, we should have documentation for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually better solution will be to copy the explanation to our ElasticSearch documentation (at airflow.apache.org) and link from it to there. The error message should explain the reason and link to the detailed discussion/explanation why - but linking to an issue is only fine only in a source comment, rather than in a user message. Theree we should only link to a documentation we control.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get your point, just got set up with breeze to write some proper documentation.

I have a question: airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler has offset_field and host_field paramaters in its constructor. I have a hard time figuring out where these are being set / come from. Are they configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea about details of it to be hones. - I would have to - similarly to you dive deep in the code to understand it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually - I know that - (Re-read this) - I believe each logging handler can be configured with parameters - you can read it in "logging" configugration in our docs./

else:
raise # Type error happened for another reason.


# return items sorted by timestamp.
result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))
Expand Down