Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Integration: ParentRunJob not finished when DAG finishes #2182

Closed
YLibert opened this issue Oct 12, 2022 · 1 comment
Closed

Airflow Integration: ParentRunJob not finished when DAG finishes #2182

YLibert opened this issue Oct 12, 2022 · 1 comment
Assignees

Comments

@YLibert
Copy link

YLibert commented Oct 12, 2022

Issue

When using openlineage integration for airflow, a job for the Dag is created from the parent facet of the tasks.

The job of the DAG itself is in a never ending RUNNING state with its start date and end date set at the default unix epoch.

image

How to reproduce

I'm using airflow MWAA 2.2.2 with openlineage integration openlineage-airflow==0.15.1

The DAG I'm using is :

from datetime import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

default_args = {
	'owner': 'yan',
	'depends_on_past': False,
	'start_date': datetime(2022, 10, 4),
	'email_on_failure': False,
	'email_on_retry': False,
    }

with DAG('DAG_Name',
    schedule_interval='@daily', 
    description='GE with Spark', 
    default_args=default_args) as dag: 
    
    begin = BashOperator(
        task_id="begin",
        bash_command='echo begin',
        )

    end = BashOperator(
        task_id="end",
        bash_command='echo end',
        )

begin >> end

The associated raw events obtained with a GET /api/v1/events/lineage

{
    "events": [
        {
            "eventType": "COMPLETE",
            "eventTime": "2022-10-12T14:39:05.654Z",
            "run": {
                "runId": "a5ecf0bc-3af0-4bf6-b1ad-99168756fb75",
                "facets": {
                    "nominalTime": null,
                    "parent": null,
                    "unknownSourceAttribute": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "unknownItems": [
                            {
                                "name": "BashOperator",
                                "type": "operator",
                                "properties": {
                                    "_dag": "<<non-serializable: DAG>>",
                                    "_log": "<<non-serializable: Logger>>",
                                    "pool": "default_pool",
                                    "label": "end",
                                    "owner": "yan",
                                    "queue": "default",
                                    "inlets": [],
                                    "params": {},
                                    "_inlets": [],
                                    "outlets": [],
                                    "retries": 0,
                                    "task_id": "end",
                                    "_outlets": [],
                                    "pool_slots": 1,
                                    "start_date": "<<non-serializable: DateTime>>",
                                    "retry_delay": "<<non-serializable: timedelta>>",
                                    "weight_rule": "downstream",
                                    "bash_command": "echo end",
                                    "do_xcom_push": true,
                                    "trigger_rule": "all_success",
                                    "email_on_retry": false,
                                    "skip_exit_code": 99,
                                    "depends_on_past": false,
                                    "executor_config": {},
                                    "output_encoding": "utf-8",
                                    "priority_weight": 1,
                                    "subprocess_hook": "<<non-serializable: SubprocessHook>>",
                                    "email_on_failure": false,
                                    "_upstream_task_ids": [],
                                    "_lock_for_execution": true,
                                    "wait_for_downstream": false,
                                    "_downstream_task_ids": [],
                                    "retry_exponential_backoff": false,
                                    "_BaseOperator__instantiated": true
                                }
                            }
                        ]
                    }
                }
            },
            "job": {
                "namespace": "airflow_local",
                "name": "DAG_Name.end",
                "facets": {
                    "documentation": null,
                    "sourceCodeLocation": null,
                    "sql": null
                }
            },
            "inputs": [],
            "outputs": [],
            "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow"
        },
        {
            "eventType": "START",
            "eventTime": "2022-10-12T14:39:05.292768Z",
            "run": {
                "runId": "a5ecf0bc-3af0-4bf6-b1ad-99168756fb75",
                "facets": {
                    "nominalTime": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                        "nominalStartTime": "2022-10-12T14:38:59.080496Z",
                        "nominalEndTime": null
                    },
                    "parent": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
                        "run": {
                            "runId": "629ce79b-994d-39a4-a850-3fa47ca79ac4"
                        },
                        "job": {
                            "namespace": "airflow_local",
                            "name": "DAG_Name"
                        }
                    },
                    "airflow_runArgs": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "externalTrigger": true
                    },
                    "airflow_version": {
                        "operator": "airflow.operators.bash.BashOperator",
                        "taskInfo": {
                            "_dag": {
                                "dag_id": "DAG_Name",
                                "schedule_interval": "@daily"
                            },
                            "_log": "<Logger ***.task.operators (INFO)>",
                            "pool": "default_pool",
                            "label": "end",
                            "owner": "yan",
                            "queue": "default",
                            "inlets": [],
                            "params": {},
                            "_inlets": [],
                            "outlets": [],
                            "retries": 0,
                            "task_id": "end",
                            "_outlets": [],
                            "pool_slots": 1,
                            "start_date": "2022-10-04T00:00:00+00:00",
                            "retry_delay": "0:05:00",
                            "weight_rule": "downstream",
                            "bash_command": "echo end",
                            "do_xcom_push": true,
                            "trigger_rule": "all_success",
                            "email_on_retry": false,
                            "skip_exit_code": 99,
                            "depends_on_past": false,
                            "executor_config": {},
                            "output_encoding": "utf-8",
                            "priority_weight": 1,
                            "subprocess_hook": "<***.hooks.subprocess.SubprocessHook object at 0x7fe180ce2690>",
                            "email_on_failure": false,
                            "_upstream_task_ids": "{'begin'}",
                            "_lock_for_execution": true,
                            "wait_for_downstream": false,
                            "_downstream_task_ids": "set()",
                            "retry_exponential_backoff": false,
                            "_BaseOperator__instantiated": true
                        },
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "airflowVersion": "2.2.2",
                        "openlineageAirflowVersion": "0.15.1"
                    },
                    "unknownSourceAttribute": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "unknownItems": [
                            {
                                "name": "BashOperator",
                                "type": "operator",
                                "properties": {
                                    "_dag": "<<non-serializable: DAG>>",
                                    "_log": "<<non-serializable: Logger>>",
                                    "pool": "default_pool",
                                    "label": "end",
                                    "owner": "yan",
                                    "queue": "default",
                                    "inlets": [],
                                    "params": {},
                                    "_inlets": [],
                                    "outlets": [],
                                    "retries": 0,
                                    "task_id": "end",
                                    "_outlets": [],
                                    "pool_slots": 1,
                                    "start_date": "<<non-serializable: DateTime>>",
                                    "retry_delay": "<<non-serializable: timedelta>>",
                                    "weight_rule": "downstream",
                                    "bash_command": "echo end",
                                    "do_xcom_push": true,
                                    "trigger_rule": "all_success",
                                    "email_on_retry": false,
                                    "skip_exit_code": 99,
                                    "depends_on_past": false,
                                    "executor_config": {},
                                    "output_encoding": "utf-8",
                                    "priority_weight": 1,
                                    "subprocess_hook": "<<non-serializable: SubprocessHook>>",
                                    "email_on_failure": false,
                                    "_upstream_task_ids": [],
                                    "_lock_for_execution": true,
                                    "wait_for_downstream": false,
                                    "_downstream_task_ids": [],
                                    "retry_exponential_backoff": false,
                                    "_BaseOperator__instantiated": true
                                }
                            }
                        ]
                    }
                }
            },
            "job": {
                "namespace": "airflow_local",
                "name": "DAG_Name.end",
                "facets": {
                    "documentation": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DocumentationJobFacet",
                        "description": "GE with Spark"
                    },
                    "sourceCodeLocation": null,
                    "sql": null
                }
            },
            "inputs": [],
            "outputs": [],
            "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow"
        },
        {
            "eventType": "COMPLETE",
            "eventTime": "2022-10-12T14:39:02.511Z",
            "run": {
                "runId": "2617c6ef-406f-4ad7-8f29-220a648f2467",
                "facets": {
                    "nominalTime": null,
                    "parent": null,
                    "unknownSourceAttribute": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "unknownItems": [
                            {
                                "name": "BashOperator",
                                "type": "operator",
                                "properties": {
                                    "_dag": "<<non-serializable: DAG>>",
                                    "_log": "<<non-serializable: Logger>>",
                                    "pool": "default_pool",
                                    "label": "begin",
                                    "owner": "yan",
                                    "queue": "default",
                                    "inlets": [],
                                    "params": {},
                                    "_inlets": [],
                                    "outlets": [],
                                    "retries": 0,
                                    "task_id": "begin",
                                    "_outlets": [],
                                    "pool_slots": 1,
                                    "start_date": "<<non-serializable: DateTime>>",
                                    "retry_delay": "<<non-serializable: timedelta>>",
                                    "weight_rule": "downstream",
                                    "bash_command": "echo begin",
                                    "do_xcom_push": true,
                                    "trigger_rule": "all_success",
                                    "email_on_retry": false,
                                    "skip_exit_code": 99,
                                    "depends_on_past": false,
                                    "executor_config": {},
                                    "output_encoding": "utf-8",
                                    "priority_weight": 1,
                                    "subprocess_hook": "<<non-serializable: SubprocessHook>>",
                                    "email_on_failure": false,
                                    "_upstream_task_ids": [],
                                    "_lock_for_execution": true,
                                    "wait_for_downstream": false,
                                    "_downstream_task_ids": [],
                                    "retry_exponential_backoff": false,
                                    "_BaseOperator__instantiated": true
                                }
                            }
                        ]
                    }
                }
            },
            "job": {
                "namespace": "airflow_local",
                "name": "DAG_Name.begin",
                "facets": {
                    "documentation": null,
                    "sourceCodeLocation": null,
                    "sql": null
                }
            },
            "inputs": [],
            "outputs": [],
            "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow"
        },
        {
            "eventType": "START",
            "eventTime": "2022-10-12T14:39:01.806498Z",
            "run": {
                "runId": "2617c6ef-406f-4ad7-8f29-220a648f2467",
                "facets": {
                    "nominalTime": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/NominalTimeRunFacet",
                        "nominalStartTime": "2022-10-12T14:38:59.080496Z",
                        "nominalEndTime": null
                    },
                    "parent": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
                        "run": {
                            "runId": "629ce79b-994d-39a4-a850-3fa47ca79ac4"
                        },
                        "job": {
                            "namespace": "airflow_local",
                            "name": "DAG_Name"
                        }
                    },
                    "airflow_runArgs": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "externalTrigger": true
                    },
                    "airflow_version": {
                        "operator": "airflow.operators.bash.BashOperator",
                        "taskInfo": {
                            "_dag": {
                                "dag_id": "DAG_Name",
                                "schedule_interval": "@daily"
                            },
                            "_log": "<Logger ***.task.operators (INFO)>",
                            "pool": "default_pool",
                            "label": "begin",
                            "owner": "yan",
                            "queue": "default",
                            "inlets": [],
                            "params": {},
                            "_inlets": [],
                            "outlets": [],
                            "retries": 0,
                            "task_id": "begin",
                            "_outlets": [],
                            "pool_slots": 1,
                            "start_date": "2022-10-04T00:00:00+00:00",
                            "retry_delay": "0:05:00",
                            "weight_rule": "downstream",
                            "bash_command": "echo begin",
                            "do_xcom_push": true,
                            "trigger_rule": "all_success",
                            "email_on_retry": false,
                            "skip_exit_code": 99,
                            "depends_on_past": false,
                            "executor_config": {},
                            "output_encoding": "utf-8",
                            "priority_weight": 1,
                            "subprocess_hook": "<***.hooks.subprocess.SubprocessHook object at 0x7fe180cdd050>",
                            "email_on_failure": false,
                            "_upstream_task_ids": "set()",
                            "_lock_for_execution": true,
                            "wait_for_downstream": false,
                            "_downstream_task_ids": "{'end'}",
                            "retry_exponential_backoff": false,
                            "_BaseOperator__instantiated": true
                        },
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "airflowVersion": "2.2.2",
                        "openlineageAirflowVersion": "0.15.1"
                    },
                    "unknownSourceAttribute": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                        "unknownItems": [
                            {
                                "name": "BashOperator",
                                "type": "operator",
                                "properties": {
                                    "_dag": "<<non-serializable: DAG>>",
                                    "_log": "<<non-serializable: Logger>>",
                                    "pool": "default_pool",
                                    "label": "begin",
                                    "owner": "yan",
                                    "queue": "default",
                                    "inlets": [],
                                    "params": {},
                                    "_inlets": [],
                                    "outlets": [],
                                    "retries": 0,
                                    "task_id": "begin",
                                    "_outlets": [],
                                    "pool_slots": 1,
                                    "start_date": "<<non-serializable: DateTime>>",
                                    "retry_delay": "<<non-serializable: timedelta>>",
                                    "weight_rule": "downstream",
                                    "bash_command": "echo begin",
                                    "do_xcom_push": true,
                                    "trigger_rule": "all_success",
                                    "email_on_retry": false,
                                    "skip_exit_code": 99,
                                    "depends_on_past": false,
                                    "executor_config": {},
                                    "output_encoding": "utf-8",
                                    "priority_weight": 1,
                                    "subprocess_hook": "<<non-serializable: SubprocessHook>>",
                                    "email_on_failure": false,
                                    "_upstream_task_ids": [],
                                    "_lock_for_execution": true,
                                    "wait_for_downstream": false,
                                    "_downstream_task_ids": [],
                                    "retry_exponential_backoff": false,
                                    "_BaseOperator__instantiated": true
                                }
                            }
                        ]
                    }
                }
            },
            "job": {
                "namespace": "airflow_local",
                "name": "DAG_Name.begin",
                "facets": {
                    "documentation": {
                        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow",
                        "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/DocumentationJobFacet",
                        "description": "GE with Spark"
                    },
                    "sourceCodeLocation": null,
                    "sql": null
                }
            },
            "inputs": [],
            "outputs": [],
            "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.15.1/integration/airflow"
        }
    ]
}

Expected behaviour

the job associated with the DAG should finish when the DAG finishes

@mobuchowski mobuchowski self-assigned this Oct 12, 2022
@wslulciuc
Copy link
Member

@YLibert: Airflow currently has a draft PR open (by our very own @mobuchowski) to add support for handling DAG state changes which will resolve OL runs hanging in the RUNNING state. Let's keep this PR open until official DAG state change support has been released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants