Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch #15613

Closed
Pravka opened this issue Apr 30, 2021 · 14 comments
Labels
affected_version:2.0 Issues Reported for 2.0 area:logging area:providers kind:bug This is a clearly a bug

Comments

@Pravka
Copy link

Pravka commented Apr 30, 2021

Hi,

I am experiencing issues with reading logs from Elasticsearch, not sure if it's a bug or my incompetence!

Apache Airflow version: 2.0.2
Elastic version: v 7.9.3
Kubernetes version: v1.19.6
Environment: Dev Kubernetes

  • Cloud provider or hardware configuration: AWS
  • OS: Debian GNU/Linux 10 (buster)
  • Kernel: Linux airflow-6d7d4568c-w7plk 4.14.138-rancher #1 SMP Sat Aug 10 11:25:46 UTC 2019 x86_64 GNU/Linux
  • Install tools: Kubernetes manifest files -- using Airflow docker image apache/airflow:2.0.2-python3.8-build

What happened:
I am running Airflow with Celery Executor inside Kubernetes cluster which runs Spark jobs via KubernetesPodOperator. I have 2 pods:

NAME                                                   READY   STATUS             RESTARTS   AGE
airflow-6d7d4568c-w7plk                                4/4     Running            0          18h
airflow-worker-5597c8cc8-nlpv9                         2/2     Running            0          18h

Airflow pod consists of airflow-ui, airflow-scheduler, airflow-flower and aws-s3-sync container used to sync DAGs from S3.
Airflow-worker pod consists of airflow-celery-worker and aws-s3-sync containers

For now, I am trying to execute a DAG which runs spark-submit --version using KubernetesPodOperator. DAG executes and logs are present in container stdout.

I use Filebeat to pick up the logs and enrich them with "add_cloud_metadata" and "add_host_metadata". Afterwards, logs are sent to Logstash for field adjustments as Airflow writes logs to Elasticsearch in one format and tries to read them in other format. This particularly applies for execution_date field. Anyhow, logs are visible in Kibana so I have parsed the fields and assembled log_id field so that Airflow can read it which I confirmed by running a query in console in Kibana.

Follow up on execution_date field. Seems like when Airflow writes logs to Elasticsearch while running in Kubernetes, fields won't be written to elasticsearch as dag_id, log_id, execution_date and try_number but rather, [kubernetes][labels][dag_id], etc etc. So, if I assemble log_id field manually, using [kubernetes][labels]* fields it turns out example field looks like this:
log_id spark-submit-spark-submit-2021-04-28T110330.1402290000-3c11bfafa-1
which is by default incorrect because, while reading logs, Airflow tries to fetch:
log_id spark-submit-spark-submit-2021-04-28T11:03:30.140229+00:00-1
I am not sure whether this here is something that needs improving or is it expected. IMO, it should not be expected as due to vague documentation with no extensive explanations on what really happens, users have to invest hours in getting to the bottom of the issue and working out a solution on their own.

After parsing execution_date to be the same as what Airflow tries to fetch, I had to enable fileddata on offset field in elasticsearch as Airflow couldn't sort offsets. After that, the error I sent below happened.

By following Airflow logs while trying to read the log from elasticsearch, below error pops up:

[2021-04-30 09:47:23,421] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
Traceback (most recent call last):
  File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/root/.local/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/root/.local/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/root/.local/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
    return func(*args, **kwargs)
  File "/root/.local/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/root/.local/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/root/.local/lib/python3.8/site-packages/airflow/www/views.py", line 1068, in get_logs_with_metadata
    logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  File "/root/.local/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/root/.local/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
    log, metadata = self._read(task_instance, try_number_element, metadata)
  File "/root/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 161, in _read
    logs_by_host = self._group_logs_by_host(logs)
  File "/root/.local/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 130, in _group_logs_by_host
    grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'

What you expected to happen: Airflow UI to display task logs in UI

How to reproduce it:
Spin kubernetes cluster, deploy Airflow with CeleryExecutor in it, use filebeat to pick up logs, send through logstash to elasticsearch. Run any job using KubernetesPodOperator and try to check task logs in Airflow UI. UI task logs view should spin until timeout, then display blank page.

Relevant information/configuration settings:

airflow.cfg:

AIRFLOW__ELASTICSEARCH__END_OF_LOG_MARK: end_of_log
AIRFLOW__ELASTICSEARCH__FRONTEND: elastic:pass@***.***.svc.cluster.local:443/{log_id}
AIRFLOW__ELASTICSEARCH__HOST: elastic:pass@***.***.svc.cluster.local:9200
AIRFLOW__ELASTICSEARCH__JSON_FIELDS: asctime, filename, lineno, levelname, message
AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True"
AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE: '{dag_id}-{task_id}-{execution_date}-{try_number}'
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT: "True"
AIRFLOW__ELASTICSEARCH_CONFIGS__CA_CERTS: /opt/certs/ca.crt
AIRFLOW__ELASTICSEARCH_CONFIGS__USE_SSL: "True"
AIRFLOW__ELASTICSEARCH_CONFIGS__VERIFY_CERTS: "True"
AIRFLOW__LOGGING__REMOTE_LOGGING: "True"

filebeat.yml:

    filebeat.autodiscover:
      providers:
        - type: kubernetes
          node: ${NODE_NAME}
          hints.enabled: true
          hints.default_config:
            type: container
            paths:
              - /var/log/containers/*${data.kubernetes.container.id}.log
    processors:
      - add_cloud_metadata:
      - add_host_metadata:
    
    output.logstash:
      hosts: '***.***.svc.cluster.local:5044'

logstash.conf:

    input {
      beats {
        port => 5044
      }
    }
    filter {
      if [kubernetes][labels][dag_id] and [kubernetes][labels][task_id] and [kubernetes][labels][execution_date] and [kubernetes][labels][try_number] {
        mutate {
          gsub => [
            "[kubernetes][labels][execution_date]", "^([0-9a-z][0-9a-z][0-9a-z][0-9a-z]-[0-9a-z][0-9a-z]-[0-9a-z][0-9a-z]T)([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])(.[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z])*([0-9a-z][0-9a-z])([0-9a-z][0-9a-z])(?:-[0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z][0-9a-z])?", "\1\2:\3:\4\5+\6:\7"
          ]
          add_field => {
            "offset" => "%{[log][offset]}"
            "log_id" => "%{[kubernetes][labels][dag_id]}-%{[kubernetes][labels][task_id]}-%{[kubernetes][labels][execution_date]}-%{[kubernetes][labels][try_number]}"
          }
        }
      }
    }
    output {
      elasticsearch {
        index => "logstash-%{[@metadata][beat]}"
        hosts => [ "https://***.***.svc.cluster.local:9200" ]
        user => "elastic"
        password => "${ES_PASSWORD}"
        cacert => '/etc/logstash/certificates/ca.crt'
      }
    }

Final thoughts:
Not sure whether I have missed something while setting the thing up following https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/logging.html or Airflow crew needs to work on improving reading logs from elasticsearch.

@Pravka Pravka added the kind:bug This is a clearly a bug label Apr 30, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Apr 30, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@Pravka Pravka changed the title Airflow2.0.2 Airflow2.0.2 --- TypeError: unhashable type: 'AttrDict' while trying to read logs from elasticsearch Apr 30, 2021
@eladkal eladkal added the affected_version:2.0 Issues Reported for 2.0 label Oct 10, 2021
@kaxil
Copy link
Member

kaxil commented Dec 30, 2021

@jedcunningham Any ideas here -- I know you worked with logstash & ES a bit on a couple of PRs, does it ring a bell?

@eladkal
Copy link
Contributor

eladkal commented Mar 14, 2022

@Pravka does the issue still happens in latest airflow version?

@eladkal
Copy link
Contributor

eladkal commented Mar 14, 2022

There is also pending PR to improve elastic search logging #21942
@millin can you confirm if it will also address this issue?

@millin
Copy link
Contributor

millin commented Mar 16, 2022

Seems like I've seen this error about a year ago, but it didn't appear in newer versions.
@Pravka what versions of Python packages apache-airflow-providers-elasticsearch, elasticsearch, elasticsearch-dsl are installed?

@github-actions
Copy link

This issue has been automatically marked as stale because it has been open for 30 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Apr 17, 2022
@cccs-seb
Copy link
Contributor

If anyone else is facing this issue, I found it was caused by using the wrong 'host_field' under

.

When I changed from the default host to host.hostname, the issue fixed itself. Seems like a Filebeat 7 breaking change #14625

@NiklasBeierl
Copy link

NiklasBeierl commented Apr 20, 2022

The problem is the combination of add_host_metadata processor, some infuriating default behavior of filebeat and AIRFLOW__ELASTICSEARCH__JSON_FORMAT: "True".

Due to the json_format setting the logs from airflow are JSON and may already contain a host field which is needed to correctly display the logs (See here)

As the documentation of add_host_metadata says, it will by default just override the host field in the document with an object. The AF webserver can not handle an object here.

I am not seeing the "host" field actually getting written by airflow when using the celery executor. I guess its only there when using Dask? So in this case the workaround is quite simple: Rename the "host" field to something else:

# In filebeat.yml
processors:
  - add_host_metadata: # This (over)writes the "host" field 
  - rename: 
      fields:
        - from: "host" # Still want this info, but I can't use the "host" field for it
          to: "host_meta"  

There is no host in any of the records and the code can deal with a null just fine since its hashable.

The longer story:

If you actually do want to preserve the original value of the "host" field (I am guessing airflow puts a string there), it gets a bit more complicated. Originally I wanted to preserve the host field in my proposed solution, it would look like this:

# WARNING, THIS WILL NOT WORK!
processors:
  - rename: # The next proc will overwrite the host field but its needed by the af webserver ...
      fields:
        - from: "host" # ... so lets just store it somewhere else
          to: "airflow_log_host" # With some AF executors there is no host-field and this will just be a NoOp
  - add_host_metadata: # This writes to the "host" field 
  - rename: 
      fail_on_error: false # This is needed to move host to host_meta even if airflow_log_host doesn't exist
      fields:
        - from: "host" # Still want this info, but I can't use the "host" field for it
          to: "host_meta"  
        - from: "airflow_log_host" # Move back the original value to the host field
          to: "host"

But it turns out that BEFORE applying any processors, filebeat will already overwrite host with an object that looks like this:
{ "name": "<hostname>" } and apparently this behavior can not be turned off.
In order to work around this, you need to use something like this:

filebeat.inputs:
  - type: log
    paths:
       - .... .json
     # JSON expansion is done in processors, DO NOT TURN IT ON HERE!
    # json.keys_under_root: true
    # json.overwrite_keys: true
    # json.add_error_key: true
    # json.expand_keys: true

# .... 

processors:
  - drop_fields: # First get rid of the "built in" host field
      fields: 
        - host
  - decode_json_fields: # Expand our JSON log to the root
      fields: 
        - message # This holds a line of JSON as string
      process_array: true
      target: "" # Store at the root
      overwrite_keys: true # message attribute will be overwritten with the message from airflow
  - rename: # The next proc will overwrite the host field which is needed by the AF webserver ...
      fields:
        - from: "host" # ... so lets just store it somewhere else
          to: "airflow_log_host" # With some AF executors there is no host-field and this will just be a NoOp
  - add_host_metadata: # This writes to the "host" field 
  - rename: 
      fail_on_error: false # This is needed to move host to host_meta even if airflow_log_host doesn't exist
      fields:
        - from: "host" # Still want this info, but I can't use the "host" field for it
          to: "host_meta"  
        - from: "airflow_log_host" # Move back the original value to the host field
          to: "host"

@eladkal eladkal removed stale Stale PRs per the .github/workflows/stale.yml policy file pending-response labels Apr 21, 2022
NiklasBeierl added a commit to NiklasBeierl/airflow that referenced this issue May 13, 2022
@abhishekbhakat
Copy link
Contributor

abhishekbhakat commented Nov 17, 2022

This still exists with apache-airflow==2.4.3+astro.1 and apache-airflow-providers-elasticsearch==4.2.1.
I'm using Opensearch 2.4.0 with Logstash and log codec is JSON.

@potiuk
Copy link
Member

potiuk commented Nov 17, 2022

Does the #15613 (comment) fix the problem for you ? Seems that this is not an Airflow problem, but filebeat one and you need to apply some fixes to Filebeat.

@abhishekbhakat
Copy link
Contributor

abhishekbhakat commented Nov 17, 2022

Not using Filebeat at all. Only Logstash, with input as log file directly. And, I'm using CeleryExecutor.

@potiuk
Copy link
Member

potiuk commented Nov 17, 2022

So i guess you should describe your configuration.

From the description above it looks like it was caused by Filebeat. Can you please provide details of your configuraiton (what and how you have configured, the exact stack trace etc? That might help to investigate the issue of somoene will look at it. The original issue was raised in 2.0.2 but having evidence from the most recent versions of both provider and Airflow might be super helpful. It seems that the problem is due to some configuration of some elasticsearch integraiton and does not exist when you use elasticsearch "as is".

It might lead to either helping you to understand how to change the configureation. Also previously I think the difficulty was that it was Filebeat and people were not able/did not want to reproduce this issue. If you provide an easy reproducible configuraiton/circumstances when it happen, there is a better chance someone will be able to reproduce it.

@abhishekbhakat
Copy link
Contributor

abhishekbhakat commented Nov 17, 2022

Found it!
By default, Opensearch was changing the host field to a dict. Needed to set AIRFLOW__ELASTICSEARCH__HOST_FIELD=host.name as per #14625 to fix it.

@potiuk
Copy link
Member

potiuk commented Nov 17, 2022

Ah cool. I pretty much hoped this would happen when you look closely. Let me just close this one then - since we have a good solution and confirmed it works.

@potiuk potiuk closed this as completed Nov 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.0 Issues Reported for 2.0 area:logging area:providers kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants