From 79b96e05d98273b66ffe98a0bc3ef6e922c8c531 Mon Sep 17 00:00:00 2001 From: Niklas Beierl <22919961+NiklasBeierl@users.noreply.github.com> Date: Thu, 21 Apr 2022 00:16:35 +0200 Subject: [PATCH] Added hint for cases like #15613 --- .../providers/elasticsearch/log/es_task_handler.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 83c1163d80c87..3bfa3897c9819 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -22,7 +22,7 @@ from datetime import datetime from operator import attrgetter from time import time -from typing import List, Optional, Tuple, Union +from typing import Hashable, List, Optional, Tuple, Union from urllib.parse import quote # Using `from elasticsearch import *` would break elasticsearch mocking used in unit test. @@ -153,7 +153,17 @@ def _group_logs_by_host(self, logs): grouped_logs = defaultdict(list) for log in logs: key = getattr(log, self.host_field, 'default_host') - grouped_logs[key].append(log) + + try: + grouped_logs[key].append(log) + except TypeError as e: + if not isinstance(key, Hashable): + raise ValueError("The host field in all log records needs to be hashable. " + "If you are using filebeat, read here: " + "https://github.com/apache/airflow/issues/15613#issuecomment-1104487752") from e + else: + raise # Type error happened for another reason. + # return items sorted by timestamp. result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))