Skip to content

Commit

Permalink
Support restricted index patterns in Elasticsearch log handler (#23888)
Browse files Browse the repository at this point in the history
Sometimes Airflow doesn't have the ability to search across all indices
in an Elasticsearch server. This might be due to security settings in
the server. In these cases fetching the remote logs fails. To fix this
we create a index_patterns configuration setting that can be set to a
more restrictive pattern.
  • Loading branch information
kouk committed Dec 8, 2022
1 parent 68217f5 commit 99bbcd3
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 7 deletions.
7 changes: 7 additions & 0 deletions airflow/config_templates/config.yml
Expand Up @@ -2317,6 +2317,13 @@
type: string
example: ~
default: "offset"
- name: index_patterns
description: |
Comma separated list of index patterns to use when searching for logs (default: `_all`).
version_added: 2.6.0
type: string
example: something-*
default: "_all"
- name: elasticsearch_configs
description: ~
options:
Expand Down
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Expand Up @@ -1163,6 +1163,10 @@ host_field = host
# The field where offset is stored (normally either `offset` or `log.offset`)
offset_field = offset

# Comma separated list of index patterns to use when searching for logs (default: `_all`).
# Example: index_patterns = something-*
index_patterns = _all

[elasticsearch_configs]
use_ssl = False
verify_certs = True
Expand Down
8 changes: 7 additions & 1 deletion airflow/providers/elasticsearch/log/es_task_handler.py
Expand Up @@ -83,6 +83,7 @@ def __init__(
offset_field: str = "offset",
host: str = "localhost:9200",
frontend: str = "localhost:5601",
index_patterns: str | None = conf.get("elasticsearch", "index_patterns", fallback="_all"),
es_kwargs: dict | None = conf.getsection("elasticsearch_configs"),
*,
filename_template: str | None = None,
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(
self.json_fields = [label.strip() for label in json_fields.split(",")]
self.host_field = host_field
self.offset_field = offset_field
self.index_patterns = index_patterns
self.context_set = False

self.formatter: logging.Formatter
Expand Down Expand Up @@ -282,7 +284,11 @@ def es_read(self, log_id: str, offset: str, metadata: dict) -> list:
:param metadata: log metadata, used for steaming log download.
"""
# Offset is the unique key for sorting logs given log_id.
search = Search(using=self.client).query("match_phrase", log_id=log_id).sort(self.offset_field)
search = (
Search(index=self.index_patterns, using=self.client)
.query("match_phrase", log_id=log_id)
.sort(self.offset_field)
)

search = search.filter("range", **{self.offset_field: {"gt": int(offset)}})
max_log_line = search.count()
Expand Down
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import fnmatch
import json

from elasticsearch import Elasticsearch
Expand Down Expand Up @@ -363,6 +364,19 @@ def match_must_phrase(document, matches, must):
# use in as a proxy for match_phrase
matches.append(document)

# Check index(es) exists.
def _validate_search_targets(self, targets):
# TODO: support allow_no_indices query parameter
matches = set()
for target in targets:
if target == "_all" or target == "":
matches.update(self.__documents_dict)
elif "*" in target:
matches.update(fnmatch.filter(self.__documents_dict, target))
elif target not in self.__documents_dict:
raise NotFoundError(404, f"IndexMissingException[[{target}] missing]")
return matches

def _normalize_index_to_list(self, index):
# Ensure to have a list of index
if index is None:
Expand All @@ -375,12 +389,11 @@ def _normalize_index_to_list(self, index):
# Is it the correct exception to use ?
raise ValueError("Invalid param 'index'")

# Check index(es) exists
for searchable_index in searchable_indexes:
if searchable_index not in self.__documents_dict:
raise NotFoundError(404, f"IndexMissingException[[{searchable_index}] missing]")

return searchable_indexes
return list(
self._validate_search_targets(
target for index in searchable_indexes for target in index.split(",")
)
)

@staticmethod
def _normalize_doc_type_to_list(doc_type):
Expand Down
54 changes: 54 additions & 0 deletions tests/providers/elasticsearch/log/test_es_task_handler.py
Expand Up @@ -105,6 +105,7 @@ def teardown_method(self):

def test_client(self):
assert isinstance(self.es_task_handler.client, elasticsearch.Elasticsearch)
assert self.es_task_handler.index_patterns == "_all"

def test_client_with_config(self):
es_conf = dict(conf.getsection("elasticsearch_configs"))
Expand All @@ -125,6 +126,21 @@ def test_client_with_config(self):
es_kwargs=es_conf,
)

def test_client_with_patterns(self):
# ensure creating with index patterns does not fail
patterns = "test_*,other_*"
handler = ElasticsearchTaskHandler(
base_log_folder=self.local_log_location,
end_of_log_mark=self.end_of_log_mark,
write_stdout=self.write_stdout,
json_format=self.json_format,
json_fields=self.json_fields,
host_field=self.host_field,
offset_field=self.offset_field,
index_patterns=patterns,
)
assert handler.index_patterns == patterns

def test_read(self, ti):
ts = pendulum.now()
logs, metadatas = self.es_task_handler.read(
Expand All @@ -139,6 +155,44 @@ def test_read(self, ti):
assert "1" == metadatas[0]["offset"]
assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts

def test_read_with_patterns(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="test_*,other_*"):
logs, metadatas = self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
)

assert 1 == len(logs)
assert len(logs) == len(metadatas)
assert len(logs[0]) == 1
assert self.test_message == logs[0][0][-1]
assert not metadatas[0]["end_of_log"]
assert "1" == metadatas[0]["offset"]
assert timezone.parse(metadatas[0]["last_log_timestamp"]) > ts

def test_read_with_patterns_no_match(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="test_other_*,test_another_*"):
logs, metadatas = self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
)

assert 1 == len(logs)
assert len(logs) == len(metadatas)
assert [[]] == logs
assert not metadatas[0]["end_of_log"]
assert "0" == metadatas[0]["offset"]
# last_log_timestamp won't change if no log lines read.
assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts

def test_read_with_missing_index(self, ti):
ts = pendulum.now()
with mock.patch.object(self.es_task_handler, "index_patterns", new="nonexistent,test_*"):
with pytest.raises(elasticsearch.exceptions.NotFoundError, match=r".*nonexistent.*"):
self.es_task_handler.read(
ti, 1, {"offset": 0, "last_log_timestamp": str(ts), "end_of_log": False}
)

@pytest.mark.parametrize("seconds", [3, 6])
def test_read_missing_logs(self, seconds, create_task_instance):
"""
Expand Down

0 comments on commit 99bbcd3

Please sign in to comment.