From 95ecf0d5078fd99333077405da119ac6da80d943 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 5 Mar 2021 09:26:03 -0700 Subject: [PATCH 1/4] Support remote logging in elasticsearch with filebeat 7 Filebeat 7 renamed some fields (offset->log.offset and host->host.name), so allow the field names Airflow uses to be configured. Airflow isn't directly involved with getting the logs _to_ elasticsearch, so we should allow easy configuration to accomodate whatever tools are used in that process. --- .../airflow_local_settings.py | 4 ++++ airflow/config_templates/config.yml | 14 +++++++++++++ airflow/config_templates/default_airflow.cfg | 6 ++++++ .../elasticsearch/log/es_task_handler.py | 20 ++++++++++++------- .../elasticsearch/log/test_es_task_handler.py | 12 ++++++++++- 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 3c8ffb844bea6..b3705bf2eeec7 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -254,6 +254,8 @@ ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT') ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT') ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS') + ELASTICSEARCH_HOST_FIELD: str = conf.get('elasticsearch', 'HOST_FIELD') + ELASTICSEARCH_OFFSET_FIELD: str = conf.get('elasticsearch', 'OFFSET_FIELD') ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { 'task': { @@ -268,6 +270,8 @@ 'write_stdout': ELASTICSEARCH_WRITE_STDOUT, 'json_format': ELASTICSEARCH_JSON_FORMAT, 'json_fields': ELASTICSEARCH_JSON_FIELDS, + 'host_field': ELASTICSEARCH_HOST_FIELD, + 'offset_field': ELASTICSEARCH_OFFSET_FIELD, }, } diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index ba9e3a4e1d140..1628f1ef8a154 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1999,6 +1999,20 @@ type: string example: ~ default: "asctime, filename, lineno, levelname, message" + - name: host_field + description: | + The field where host name is stored (normally either `host` or `host.name`) + version_added: 2.0.2 + type: string + example: ~ + default: "host" + - name: offset_field + description: | + The field where offset is stored (normally either `offset` or `log.offset`) + version_added: 2.0.2 + type: string + example: ~ + default: "offset" - name: elasticsearch_configs description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 1fc3dea89b938..5b816744205ad 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -992,6 +992,12 @@ json_format = False # Log fields to also attach to the json output, if enabled json_fields = asctime, filename, lineno, levelname, message +# The field where host name is stored (normally either `host` or `host.name`) +host_field = host + +# The field where offset is stored (normally either `offset` or `log.offset`) +offset_field = offset + [elasticsearch_configs] use_ssl = False verify_certs = True diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 1ffc6b5f64ea6..5ce8cd060ae4c 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -20,6 +20,7 @@ import sys from collections import defaultdict from datetime import datetime +from operator import attrgetter from time import time from typing import List, Optional, Tuple from urllib.parse import quote @@ -71,6 +72,8 @@ def __init__( # pylint: disable=too-many-arguments write_stdout: bool, json_format: bool, json_fields: str, + host_field: str, + offset_field: str, host: str = "localhost:9200", frontend: str = "localhost:5601", es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"), @@ -94,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments self.write_stdout = write_stdout self.json_format = json_format self.json_fields = [label.strip() for label in json_fields.split(",")] + self.host_field = host_field + self.offset_field = offset_field self.handler = None self.context_set = False @@ -122,11 +127,10 @@ def _clean_execution_date(execution_date: datetime) -> str: """ return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f") - @staticmethod - def _group_logs_by_host(logs): + def _group_logs_by_host(self, logs): grouped_logs = defaultdict(list) for log in logs: - key = getattr(log, 'host', 'default_host') + key = getattr(log, self.host_field, 'default_host') grouped_logs[key].append(log) # return items sorted by timestamp. @@ -160,7 +164,7 @@ def _read( logs = self.es_read(log_id, offset, metadata) logs_by_host = self._group_logs_by_host(logs) - next_offset = offset if not logs else logs[-1].offset + next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1]) # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly # on the client. Sending as a string prevents this issue. @@ -227,14 +231,16 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list: :type metadata: dict """ # Offset is the unique key for sorting logs given log_id. - search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset') + search = Search(using=self.client).query('match_phrase', log_id=log_id).sort(self.offset_field) - search = search.filter('range', offset={'gt': int(offset)}) + search = search.filter('range', **{self.offset_field: {'gt': int(offset)}}) max_log_line = search.count() if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata: try: if max_log_line > 0: - metadata['max_offset'] = search[max_log_line - 1].execute()[-1].offset + metadata['max_offset'] = attrgetter(self.offset_field)( + search[max_log_line - 1].execute()[-1] + ) else: metadata['max_offset'] = 0 except Exception: # pylint: disable=broad-except diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 8a3a7a2b46108..b9ca5f26873e7 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -38,7 +38,7 @@ from .elasticmock import elasticmock -class TestElasticsearchTaskHandler(unittest.TestCase): +class TestElasticsearchTaskHandler(unittest.TestCase): # pylint: disable=too-many-instance-attributes DAG_ID = 'dag_for_testing_file_task_handler' TASK_ID = 'task_for_testing_file_log_handler' EXECUTION_DATE = datetime(2016, 1, 1) @@ -54,6 +54,8 @@ def setUp(self): self.write_stdout = False self.json_format = False self.json_fields = 'asctime,filename,lineno,levelname,message,exc_text' + self.host_field = 'host' + self.offset_field = 'offset' self.es_task_handler = ElasticsearchTaskHandler( self.local_log_location, self.filename_template, @@ -62,6 +64,8 @@ def setUp(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, ) self.es = elasticsearch.Elasticsearch( # pylint: disable=invalid-name @@ -103,6 +107,8 @@ def test_client_with_config(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, es_kwargs=es_conf, ) @@ -357,6 +363,8 @@ def test_render_log_id(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, ) log_id = self.es_task_handler._render_log_id(self.ti, 1) assert expected_log_id == log_id @@ -382,6 +390,8 @@ def test_get_external_log_url(self, es_frontend, expected_url): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, frontend=es_frontend, ) url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number) From 122979bc9ab1a0f8717ff425f94d3a3f9f086125 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 9 Jun 2021 11:51:51 -0600 Subject: [PATCH 2/4] Add tests --- .../elasticsearch/log/test_es_task_handler.py | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index b9ca5f26873e7..40257221d12e4 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -282,6 +282,55 @@ def test_read_with_json_format(self): ) assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1] + def test_read_with_json_format_with_custom_offset_and_host_fields(self): + ts = pendulum.now() + formatter = logging.Formatter( + '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s' + ) + self.es_task_handler.formatter = formatter + self.es_task_handler.json_format = True + self.es_task_handler.host_field = "host.name" + self.es_task_handler.offset_field = "log.offset" + + self.body = { + 'message': self.test_message, + 'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1', + 'log': {'offset': 1}, + 'host': {'name': 'somehostname'}, + 'asctime': '2020-12-24 19:25:00,962', + 'filename': 'taskinstance.py', + 'lineno': 851, + 'levelname': 'INFO', + } + self.es_task_handler.set_context(self.ti) + self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id) + + logs, _ = self.es_task_handler.read( + self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False} + ) + assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1] + + def test_read_with_custom_offset_and_host_fields(self): + ts = pendulum.now() + # Delete the existing log entry as it doesn't have the new offset and host fields + self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) + + self.es_task_handler.host_field = "host.name" + self.es_task_handler.offset_field = "log.offset" + + self.body = { + 'message': self.test_message, + 'log_id': self.LOG_ID, + 'log': {'offset': 1}, + 'host': {'name': 'somehostname'}, + } + self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id) + + logs, _ = self.es_task_handler.read( + self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False} + ) + assert self.test_message == logs[0][0][1] + def test_close(self): formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.es_task_handler.formatter = formatter From cd7026f79caf088f4e8dd002155b4bd843c15467 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Wed, 9 Jun 2021 12:12:57 -0600 Subject: [PATCH 3/4] Bump version_added --- airflow/config_templates/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1628f1ef8a154..3d864efaaafb6 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2002,14 +2002,14 @@ - name: host_field description: | The field where host name is stored (normally either `host` or `host.name`) - version_added: 2.0.2 + version_added: 2.1.1 type: string example: ~ default: "host" - name: offset_field description: | The field where offset is stored (normally either `offset` or `log.offset`) - version_added: 2.0.2 + version_added: 2.1.1 type: string example: ~ default: "offset" From 7f3a2083cfc70ea3ae4a4945f80b4c4963df1809 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 11 Jun 2021 09:09:25 -0600 Subject: [PATCH 4/4] Update airflow/providers/elasticsearch/log/es_task_handler.py Co-authored-by: Ash Berlin-Taylor --- airflow/providers/elasticsearch/log/es_task_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 5ce8cd060ae4c..ae08ecdb6a7e6 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -72,8 +72,8 @@ def __init__( # pylint: disable=too-many-arguments write_stdout: bool, json_format: bool, json_fields: str, - host_field: str, - offset_field: str, + host_field: str = "host", + offset_field: str = "offset", host: str = "localhost:9200", frontend: str = "localhost:5601", es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),