Skip to content

Commit

Permalink
Added hint for cases like apache#15613
Browse files Browse the repository at this point in the history
  • Loading branch information
NiklasBeierl committed May 13, 2022
1 parent 2111d73 commit 79b96e0
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -22,7 +22,7 @@
from datetime import datetime
from operator import attrgetter
from time import time
from typing import List, Optional, Tuple, Union
from typing import Hashable, List, Optional, Tuple, Union
from urllib.parse import quote

# Using `from elasticsearch import *` would break elasticsearch mocking used in unit test.
Expand Down Expand Up @@ -153,7 +153,17 @@ def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
for log in logs:
key = getattr(log, self.host_field, 'default_host')
grouped_logs[key].append(log)

try:
grouped_logs[key].append(log)
except TypeError as e:
if not isinstance(key, Hashable):
raise ValueError("The host field in all log records needs to be hashable. "
"If you are using filebeat, read here: "
"https://github.com/apache/airflow/issues/15613#issuecomment-1104487752") from e
else:
raise # Type error happened for another reason.


# return items sorted by timestamp.
result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))
Expand Down

0 comments on commit 79b96e0

Please sign in to comment.