Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple DagProcessors parsing files from different locations. #25935

Merged
merged 19 commits into from Sep 6, 2022

Conversation

mhenc
Copy link
Collaborator

@mhenc mhenc commented Aug 24, 2022

Support running multiple standalone DagProcessor each configured to parse dags from different directory.

Changes:

  • add new column 'dag_directory' to DagModel and SerializedDagModel
  • Expose information about the dag_directory used by current DagProcessorManager (to set the value for DagModel/SerializedDagModel)
  • Make sure DagProcessor marks only its own dags are inactive (based on 'dag_directory')
  • Make sure DagProcessor receives Callbacks only for its own dags (based on 'dag_directory')
  • Add additional method for SchedulerJob to mark dags as stale if they were not updated within last 10 minutes (e.g. in case where one of DagProcessor was killed)

Usage:

  • make sure [scheduler]standalone_dag_processor is true
  • run each dag processor with
airflow dag-processor --subdir /files/dags/dags1
airflow dag-processor --subdir /files/dags/dags2

Part of https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation

@boring-cyborg boring-cyborg bot added area:providers area:Scheduler Scheduler or dag parsing Issues area:serialization kind:documentation provider:microsoft-azure Azure-related issues provider:google Google (including GCP) related issues labels Aug 24, 2022
@mhenc mhenc changed the title Multiple dp Support multiple DagProcessor parsing files from different locations. Aug 24, 2022
@mhenc mhenc force-pushed the multiple_dp branch 6 times, most recently from 9dd1396 to fb3298a Compare August 26, 2022 09:34
@mhenc mhenc closed this Aug 26, 2022
@mhenc mhenc reopened this Aug 26, 2022
@mhenc mhenc marked this pull request as ready for review August 26, 2022 09:38
@mhenc
Copy link
Collaborator Author

mhenc commented Aug 26, 2022

Not sure what is the state of releasing 2.4.0 - if we can't fit this PR then it may wait until 2.5.0 I believe.

@mhenc mhenc changed the title Support multiple DagProcessor parsing files from different locations. Support multiple DagProcessors parsing files from different locations. Aug 26, 2022
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than a global variable (which is what DagProcessorDirectory is, how about:

Add an attribute to the DagFileProcessorProcess constructor (which is passed down from the Manager), and add a dag_directory argument to DagBag.sync_to_db which can get passed down to DAG.bulk_write_to_db and SerializedDagModel.write_dag

airflow/dag_processing/dag_directory/dag_directory.py Outdated Show resolved Hide resolved
airflow/dag_processing/manager.py Outdated Show resolved Hide resolved
airflow/jobs/scheduler_job.py Outdated Show resolved Hide resolved
@potiuk potiuk removed provider:google Google (including GCP) related issues provider:microsoft-azure Azure-related issues area:providers labels Aug 26, 2022
@mhenc mhenc force-pushed the multiple_dp branch 2 times, most recently from 7c76fec to edab1fc Compare August 29, 2022 12:23
@mhenc mhenc requested a review from ashb September 6, 2022 14:57
@ashb ashb merged commit f878854 into apache:main Sep 6, 2022
@mhenc mhenc deleted the multiple_dp branch September 6, 2022 19:26
@@ -1077,6 +1077,10 @@ standalone_dag_processor = False
# in database. Contains maximum number of callbacks that are fetched during a single loop.
max_callbacks_per_loop = 20

# Only applicable if `[scheduler]standalone_dag_processor` is true.
# Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
dag_stale_not_seen_duration = 600
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dag_stale_not_seen_duration --> any suggestion for a better name? this config name isn't easy understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh.. naming things...

Let me be wild on that one:

deactivation_time_for_missing_dags_in_standalone_dag_processor_mode

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another suggestion from @ashb was mark_dag_stale_not_seen_in
Is it better?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of them are awful :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually yours @potiuk helped me to understand the meaning of that parameter ;p

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Horrible names can also be best :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AIP-43 DAG processor separation AIP-43 area:Scheduler Scheduler or dag parsing Issues area:serialization kind:documentation type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants