From 77060cdeeb6b82f4aa7891e9feb0cea677857525 Mon Sep 17 00:00:00 2001 From: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> Date: Fri, 11 Jun 2021 12:32:42 -0600 Subject: [PATCH] Support remote logging in elasticsearch with filebeat 7 (#14625) Filebeat 7 renamed some fields (offset->log.offset and host->host.name), so allow the field names Airflow uses to be configured. Airflow isn't directly involved with getting the logs _to_ elasticsearch, so we should allow easy configuration to accomodate whatever tools are used in that process. (cherry picked from commit 5cd0bf733b839951c075c54e808a595ac923c4e8) --- .../airflow_local_settings.py | 4 ++ airflow/config_templates/config.yml | 14 +++++ airflow/config_templates/default_airflow.cfg | 6 ++ .../elasticsearch/log/es_task_handler.py | 20 +++--- .../elasticsearch/log/test_es_task_handler.py | 61 ++++++++++++++++++- 5 files changed, 97 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index 3c8ffb844bea6..b3705bf2eeec7 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -254,6 +254,8 @@ ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT') ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT') ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS') + ELASTICSEARCH_HOST_FIELD: str = conf.get('elasticsearch', 'HOST_FIELD') + ELASTICSEARCH_OFFSET_FIELD: str = conf.get('elasticsearch', 'OFFSET_FIELD') ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = { 'task': { @@ -268,6 +270,8 @@ 'write_stdout': ELASTICSEARCH_WRITE_STDOUT, 'json_format': ELASTICSEARCH_JSON_FORMAT, 'json_fields': ELASTICSEARCH_JSON_FIELDS, + 'host_field': ELASTICSEARCH_HOST_FIELD, + 'offset_field': ELASTICSEARCH_OFFSET_FIELD, }, } diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 39d25395e12cb..dd2d48f61e1b5 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1992,6 +1992,20 @@ type: string example: ~ default: "asctime, filename, lineno, levelname, message" + - name: host_field + description: | + The field where host name is stored (normally either `host` or `host.name`) + version_added: 2.1.1 + type: string + example: ~ + default: "host" + - name: offset_field + description: | + The field where offset is stored (normally either `offset` or `log.offset`) + version_added: 2.1.1 + type: string + example: ~ + default: "offset" - name: elasticsearch_configs description: ~ options: diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index bf033ef89f31a..f8e8588f5669a 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -989,6 +989,12 @@ json_format = False # Log fields to also attach to the json output, if enabled json_fields = asctime, filename, lineno, levelname, message +# The field where host name is stored (normally either `host` or `host.name`) +host_field = host + +# The field where offset is stored (normally either `offset` or `log.offset`) +offset_field = offset + [elasticsearch_configs] use_ssl = False verify_certs = True diff --git a/airflow/providers/elasticsearch/log/es_task_handler.py b/airflow/providers/elasticsearch/log/es_task_handler.py index 44e72bf472094..16e4d6502752c 100644 --- a/airflow/providers/elasticsearch/log/es_task_handler.py +++ b/airflow/providers/elasticsearch/log/es_task_handler.py @@ -20,6 +20,7 @@ import sys from collections import defaultdict from datetime import datetime +from operator import attrgetter from time import time from typing import List, Optional, Tuple from urllib.parse import quote @@ -71,6 +72,8 @@ def __init__( # pylint: disable=too-many-arguments write_stdout: bool, json_format: bool, json_fields: str, + host_field: str = "host", + offset_field: str = "offset", host: str = "localhost:9200", frontend: str = "localhost:5601", es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"), @@ -94,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments self.write_stdout = write_stdout self.json_format = json_format self.json_fields = [label.strip() for label in json_fields.split(",")] + self.host_field = host_field + self.offset_field = offset_field self.handler = None self.context_set = False @@ -122,11 +127,10 @@ def _clean_execution_date(execution_date: datetime) -> str: """ return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f") - @staticmethod - def _group_logs_by_host(logs): + def _group_logs_by_host(self, logs): grouped_logs = defaultdict(list) for log in logs: - key = getattr(log, 'host', 'default_host') + key = getattr(log, self.host_field, 'default_host') grouped_logs[key].append(log) # return items sorted by timestamp. @@ -160,7 +164,7 @@ def _read( logs = self.es_read(log_id, offset, metadata) logs_by_host = self._group_logs_by_host(logs) - next_offset = offset if not logs else logs[-1].offset + next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1]) # Ensure a string here. Large offset numbers will get JSON.parsed incorrectly # on the client. Sending as a string prevents this issue. @@ -227,14 +231,16 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list: :type metadata: dict """ # Offset is the unique key for sorting logs given log_id. - search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset') + search = Search(using=self.client).query('match_phrase', log_id=log_id).sort(self.offset_field) - search = search.filter('range', offset={'gt': int(offset)}) + search = search.filter('range', **{self.offset_field: {'gt': int(offset)}}) max_log_line = search.count() if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata: try: if max_log_line > 0: - metadata['max_offset'] = search[max_log_line - 1].execute()[-1].offset + metadata['max_offset'] = attrgetter(self.offset_field)( + search[max_log_line - 1].execute()[-1] + ) else: metadata['max_offset'] = 0 except Exception: # pylint: disable=broad-except diff --git a/tests/providers/elasticsearch/log/test_es_task_handler.py b/tests/providers/elasticsearch/log/test_es_task_handler.py index 8a3a7a2b46108..40257221d12e4 100644 --- a/tests/providers/elasticsearch/log/test_es_task_handler.py +++ b/tests/providers/elasticsearch/log/test_es_task_handler.py @@ -38,7 +38,7 @@ from .elasticmock import elasticmock -class TestElasticsearchTaskHandler(unittest.TestCase): +class TestElasticsearchTaskHandler(unittest.TestCase): # pylint: disable=too-many-instance-attributes DAG_ID = 'dag_for_testing_file_task_handler' TASK_ID = 'task_for_testing_file_log_handler' EXECUTION_DATE = datetime(2016, 1, 1) @@ -54,6 +54,8 @@ def setUp(self): self.write_stdout = False self.json_format = False self.json_fields = 'asctime,filename,lineno,levelname,message,exc_text' + self.host_field = 'host' + self.offset_field = 'offset' self.es_task_handler = ElasticsearchTaskHandler( self.local_log_location, self.filename_template, @@ -62,6 +64,8 @@ def setUp(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, ) self.es = elasticsearch.Elasticsearch( # pylint: disable=invalid-name @@ -103,6 +107,8 @@ def test_client_with_config(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, es_kwargs=es_conf, ) @@ -276,6 +282,55 @@ def test_read_with_json_format(self): ) assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1] + def test_read_with_json_format_with_custom_offset_and_host_fields(self): + ts = pendulum.now() + formatter = logging.Formatter( + '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s' + ) + self.es_task_handler.formatter = formatter + self.es_task_handler.json_format = True + self.es_task_handler.host_field = "host.name" + self.es_task_handler.offset_field = "log.offset" + + self.body = { + 'message': self.test_message, + 'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1', + 'log': {'offset': 1}, + 'host': {'name': 'somehostname'}, + 'asctime': '2020-12-24 19:25:00,962', + 'filename': 'taskinstance.py', + 'lineno': 851, + 'levelname': 'INFO', + } + self.es_task_handler.set_context(self.ti) + self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id) + + logs, _ = self.es_task_handler.read( + self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False} + ) + assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1] + + def test_read_with_custom_offset_and_host_fields(self): + ts = pendulum.now() + # Delete the existing log entry as it doesn't have the new offset and host fields + self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1) + + self.es_task_handler.host_field = "host.name" + self.es_task_handler.offset_field = "log.offset" + + self.body = { + 'message': self.test_message, + 'log_id': self.LOG_ID, + 'log': {'offset': 1}, + 'host': {'name': 'somehostname'}, + } + self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id) + + logs, _ = self.es_task_handler.read( + self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False} + ) + assert self.test_message == logs[0][0][1] + def test_close(self): formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') self.es_task_handler.formatter = formatter @@ -357,6 +412,8 @@ def test_render_log_id(self): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, ) log_id = self.es_task_handler._render_log_id(self.ti, 1) assert expected_log_id == log_id @@ -382,6 +439,8 @@ def test_get_external_log_url(self, es_frontend, expected_url): self.write_stdout, self.json_format, self.json_fields, + self.host_field, + self.offset_field, frontend=es_frontend, ) url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)