Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote logging in elasticsearch with filebeat 7 #14625

Merged
merged 4 commits into from Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/config_templates/airflow_local_settings.py
Expand Up @@ -254,6 +254,8 @@
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean('elasticsearch', 'WRITE_STDOUT')
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean('elasticsearch', 'JSON_FORMAT')
ELASTICSEARCH_JSON_FIELDS: str = conf.get('elasticsearch', 'JSON_FIELDS')
ELASTICSEARCH_HOST_FIELD: str = conf.get('elasticsearch', 'HOST_FIELD')
ELASTICSEARCH_OFFSET_FIELD: str = conf.get('elasticsearch', 'OFFSET_FIELD')

ELASTIC_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
'task': {
Expand All @@ -268,6 +270,8 @@
'write_stdout': ELASTICSEARCH_WRITE_STDOUT,
'json_format': ELASTICSEARCH_JSON_FORMAT,
'json_fields': ELASTICSEARCH_JSON_FIELDS,
'host_field': ELASTICSEARCH_HOST_FIELD,
'offset_field': ELASTICSEARCH_OFFSET_FIELD,
},
}

Expand Down
14 changes: 14 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -1999,6 +1999,20 @@
type: string
example: ~
default: "asctime, filename, lineno, levelname, message"
- name: host_field
description: |
The field where host name is stored (normally either `host` or `host.name`)
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
version_added: 2.1.1
type: string
example: ~
default: "host"
- name: offset_field
description: |
The field where offset is stored (normally either `offset` or `log.offset`)
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved
version_added: 2.1.1
type: string
example: ~
default: "offset"
- name: elasticsearch_configs
description: ~
options:
Expand Down
6 changes: 6 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -992,6 +992,12 @@ json_format = False
# Log fields to also attach to the json output, if enabled
json_fields = asctime, filename, lineno, levelname, message

# The field where host name is stored (normally either `host` or `host.name`)
host_field = host

# The field where offset is stored (normally either `offset` or `log.offset`)
offset_field = offset
jedcunningham marked this conversation as resolved.
Show resolved Hide resolved

[elasticsearch_configs]
use_ssl = False
verify_certs = True
Expand Down
20 changes: 13 additions & 7 deletions airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -20,6 +20,7 @@
import sys
from collections import defaultdict
from datetime import datetime
from operator import attrgetter
from time import time
from typing import List, Optional, Tuple
from urllib.parse import quote
Expand Down Expand Up @@ -71,6 +72,8 @@ def __init__( # pylint: disable=too-many-arguments
write_stdout: bool,
json_format: bool,
json_fields: str,
host_field: str = "host",
offset_field: str = "offset",
host: str = "localhost:9200",
frontend: str = "localhost:5601",
es_kwargs: Optional[dict] = conf.getsection("elasticsearch_configs"),
Expand All @@ -94,6 +97,8 @@ def __init__( # pylint: disable=too-many-arguments
self.write_stdout = write_stdout
self.json_format = json_format
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host_field = host_field
self.offset_field = offset_field
self.handler = None
self.context_set = False

Expand Down Expand Up @@ -122,11 +127,10 @@ def _clean_execution_date(execution_date: datetime) -> str:
"""
return execution_date.strftime("%Y_%m_%dT%H_%M_%S_%f")

@staticmethod
def _group_logs_by_host(logs):
def _group_logs_by_host(self, logs):
grouped_logs = defaultdict(list)
for log in logs:
key = getattr(log, 'host', 'default_host')
key = getattr(log, self.host_field, 'default_host')
grouped_logs[key].append(log)

# return items sorted by timestamp.
Expand Down Expand Up @@ -160,7 +164,7 @@ def _read(
logs = self.es_read(log_id, offset, metadata)
logs_by_host = self._group_logs_by_host(logs)

next_offset = offset if not logs else logs[-1].offset
next_offset = offset if not logs else attrgetter(self.offset_field)(logs[-1])

# Ensure a string here. Large offset numbers will get JSON.parsed incorrectly
# on the client. Sending as a string prevents this issue.
Expand Down Expand Up @@ -227,14 +231,16 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
:type metadata: dict
"""
# Offset is the unique key for sorting logs given log_id.
search = Search(using=self.client).query('match_phrase', log_id=log_id).sort('offset')
search = Search(using=self.client).query('match_phrase', log_id=log_id).sort(self.offset_field)

search = search.filter('range', offset={'gt': int(offset)})
search = search.filter('range', **{self.offset_field: {'gt': int(offset)}})
max_log_line = search.count()
if 'download_logs' in metadata and metadata['download_logs'] and 'max_offset' not in metadata:
try:
if max_log_line > 0:
metadata['max_offset'] = search[max_log_line - 1].execute()[-1].offset
metadata['max_offset'] = attrgetter(self.offset_field)(
search[max_log_line - 1].execute()[-1]
)
else:
metadata['max_offset'] = 0
except Exception: # pylint: disable=broad-except
Expand Down
61 changes: 60 additions & 1 deletion tests/providers/elasticsearch/log/test_es_task_handler.py
Expand Up @@ -38,7 +38,7 @@
from .elasticmock import elasticmock


class TestElasticsearchTaskHandler(unittest.TestCase):
class TestElasticsearchTaskHandler(unittest.TestCase): # pylint: disable=too-many-instance-attributes
DAG_ID = 'dag_for_testing_file_task_handler'
TASK_ID = 'task_for_testing_file_log_handler'
EXECUTION_DATE = datetime(2016, 1, 1)
Expand All @@ -54,6 +54,8 @@ def setUp(self):
self.write_stdout = False
self.json_format = False
self.json_fields = 'asctime,filename,lineno,levelname,message,exc_text'
self.host_field = 'host'
self.offset_field = 'offset'
self.es_task_handler = ElasticsearchTaskHandler(
self.local_log_location,
self.filename_template,
Expand All @@ -62,6 +64,8 @@ def setUp(self):
self.write_stdout,
self.json_format,
self.json_fields,
self.host_field,
self.offset_field,
)

self.es = elasticsearch.Elasticsearch( # pylint: disable=invalid-name
Expand Down Expand Up @@ -103,6 +107,8 @@ def test_client_with_config(self):
self.write_stdout,
self.json_format,
self.json_fields,
self.host_field,
self.offset_field,
es_kwargs=es_conf,
)

Expand Down Expand Up @@ -276,6 +282,55 @@ def test_read_with_json_format(self):
)
assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]

def test_read_with_json_format_with_custom_offset_and_host_fields(self):
ts = pendulum.now()
formatter = logging.Formatter(
'[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s - %(exc_text)s'
)
self.es_task_handler.formatter = formatter
self.es_task_handler.json_format = True
self.es_task_handler.host_field = "host.name"
self.es_task_handler.offset_field = "log.offset"

self.body = {
'message': self.test_message,
'log_id': f'{self.DAG_ID}-{self.TASK_ID}-2016_01_01T00_00_00_000000-1',
'log': {'offset': 1},
'host': {'name': 'somehostname'},
'asctime': '2020-12-24 19:25:00,962',
'filename': 'taskinstance.py',
'lineno': 851,
'levelname': 'INFO',
}
self.es_task_handler.set_context(self.ti)
self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)

logs, _ = self.es_task_handler.read(
self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
)
assert "[2020-12-24 19:25:00,962] {taskinstance.py:851} INFO - some random stuff - " == logs[0][0][1]

def test_read_with_custom_offset_and_host_fields(self):
ts = pendulum.now()
# Delete the existing log entry as it doesn't have the new offset and host fields
self.es.delete(index=self.index_name, doc_type=self.doc_type, id=1)

self.es_task_handler.host_field = "host.name"
self.es_task_handler.offset_field = "log.offset"

self.body = {
'message': self.test_message,
'log_id': self.LOG_ID,
'log': {'offset': 1},
'host': {'name': 'somehostname'},
}
self.es.index(index=self.index_name, doc_type=self.doc_type, body=self.body, id=id)

logs, _ = self.es_task_handler.read(
self.ti, 1, {'offset': 0, 'last_log_timestamp': str(ts), 'end_of_log': False}
)
assert self.test_message == logs[0][0][1]

def test_close(self):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
self.es_task_handler.formatter = formatter
Expand Down Expand Up @@ -357,6 +412,8 @@ def test_render_log_id(self):
self.write_stdout,
self.json_format,
self.json_fields,
self.host_field,
self.offset_field,
)
log_id = self.es_task_handler._render_log_id(self.ti, 1)
assert expected_log_id == log_id
Expand All @@ -382,6 +439,8 @@ def test_get_external_log_url(self, es_frontend, expected_url):
self.write_stdout,
self.json_format,
self.json_fields,
self.host_field,
self.offset_field,
frontend=es_frontend,
)
url = es_task_handler.get_external_log_url(self.ti, self.ti.try_number)
Expand Down