Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store callbacks in database if standalone_dag_processor config is True. #21731

Merged
merged 6 commits into from Mar 8, 2022

Conversation

mhenc
Copy link
Collaborator

@mhenc mhenc commented Feb 22, 2022

This change introduces new config: [scheduler]standalone_dag_processor - if set to True, all callbacks are stored in the dedicated callback_request table in the database.
They are then read from there by DagProcessor and executed.

If option is False then callbacks are still send to DagProcessor using Pipe.

This change will allow us to run DagProcessor in a separate process, independently from SchedulerJob.

Part of AIP-43
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-43+DAG+Processor+separation

@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes provider related issues area:Scheduler Scheduler or dag parsing Issues kind:documentation labels Feb 22, 2022
@mhenc mhenc force-pushed the callbacks_db branch 5 times, most recently from 88f30c1 to 58f8973 Compare February 23, 2022 10:15
@mhenc mhenc marked this pull request as ready for review February 23, 2022 10:43
@mhenc mhenc force-pushed the callbacks_db branch 3 times, most recently from 29a7721 to 93b61e9 Compare February 28, 2022 19:44
@@ -858,7 +865,7 @@ def _do_scheduling(self, session) -> int:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue

self._send_dag_callbacks_to_processor(dag, callback_to_run)
callbacks_to_send.append((dag, callback_to_run))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have concern about this part - is it critical to send callbacks at this moment?
Is it a big problem if we move it to the bottom - outside of the commit-guard?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this may change the behaviour in some cases. Judging from the expunge_all() call below, I suspect the callbacks are allowed to update the database before the critical section below. Moving the callbacks to after the section means they no longer have this ability.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we could end the previous guard just before, send callbacks to DB then start another guard below.
I don't know how important it is to keep the single guard for all these lines 843-900

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I think that guard couid be stopped here and restart later (or maybe even it is not needed later @ashb? I tihink the main reason of the "prohibit_commit" guard was to make sure the DagRun rows are locked, but after the commit above - they are already freed anyway. I think the guard is used from this place on to make sure that simply none of the code is using accidental commit (and it does not protect the DagRun rows which are freed at this point), so conceptually, I see no problem if the Callbacks are run outside of the guard and we have a separate guard for critical section.

That would make quite a lot of sense actually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah make sense Jarek:

        with prohibit_commit(session) as guard:
            ...
            guard.commit()

        # Send the callbacks after we commit to ensure the context is up to date when it gets run
        for dag_run, callback_to_run in callback_tuples:
            dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
            if not dag:
                self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
                continue

            self._send_dag_callbacks_to_processor(dag, callback_to_run)

        with prohibit_commit(session) as guard:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

@mhenc mhenc requested a review from uranusjr March 1, 2022 09:01
if hasattr(ti, 'run_as_user'):
self._run_as_user = ti.run_as_user
self._pool: str = ti.pool
self._run_as_user = run_as_user
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nice :)

return NotImplemented

@classmethod
def from_ti(cls, ti: TaskInstance):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good Idea.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I have a few comments - less important, but I think the "parallelism" query for fetching callbacks is important to discuss. I might be missing something though, so happy to discuss it.

@mhenc mhenc requested a review from potiuk March 7, 2022 15:55
# Nothing to do if callbacks are not stored in the database
return
self.log.debug("Fetching callbacks from the database.")
query = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add the "commit guard" here. The code is very simple now and probably we will not make a mistake but this will prevent future mistakes where someone by mistake will add a commit to release the lock.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added guard.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two more small things from my side.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am cool with that! Good job @mhenc !

@potiuk
Copy link
Member

potiuk commented Mar 8, 2022

Some tests are failing though :)

@potiuk potiuk merged commit 5ace37a into apache:main Mar 8, 2022
@mhenc mhenc deleted the callbacks_db branch March 8, 2022 13:57
@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Apr 8, 2022
@jyotsa09
Copy link

jyotsa09 commented Apr 21, 2022

Hi @mhenc, I have not tested this completely but got some issues even I'm not sure are they valid or not. Let me explain them -
I have updated my docker file with AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR=true and then ran airflow dag-processor command. After this change dag-processor started working and then I triggered this dag - CallbackDag. As a result I was expecting it should update callback_request table data but for me nothing inserted into that table.
I saw this - Scheduler-Process(via the Executor) instead of sending the callbacks to the queue adds them to the database. in AIP-43 docs and observed scheduler logs -

[2022-04-21 11:31:57,260] {scheduler_job.py:547} INFO - Sending TaskInstanceKey(dag_id='example_callback', task_id='task2', run_id='manual__2022-04-21T11:31:55.720765+00:00', try_number=1, map_index=-1) to executor with priority 2 and queue default
[2022-04-21 11:31:57,261] {base_executor.py:88} INFO - Adding to queue: ['airflow', 'tasks', 'run', 'example_callback', 'task2', 'manual__2022-04-21T11:31:55.720765+00:00', '--local', '--subdir', 'DAGS_FOLDER/new_dag.py']
[2022-04-21 11:31:57,270] {local_executor.py:79} INFO - QueuedLocalWorker running ['airflow', 'tasks', 'run', 'example_callback', 'task2', 'manual__2022-04-21T11:31:55.720765+00:00', '--local', '--subdir', 'DAGS_FOLDER/new_dag.py']

If there is anything that I need to change please let me know.

@potiuk
Copy link
Member

potiuk commented Apr 25, 2022

@mhenc - any commments on that ?

@mhenc
Copy link
Collaborator Author

mhenc commented Apr 27, 2022

@jyotsa09 I just tested it and callbacks seem executed - however instead of "print" I wrote to the file as I am not sure whether stdout from callbacks is redirected to logs.
something like:

def dag_success_alert(context):
    f = open("/tmp/success.txt", "a")
    f.write(f"DAG has succeeded, run_id: {context['run_id']}")
    f.close()

Note that callbacks are removed from the DB by DagProcessor when fetching them
https://github.com/apache/airflow/blob/main/airflow/dag_processing/manager.py#L661
so they are in DB for a very short time.

@ldacey
Copy link
Contributor

ldacey commented Jun 10, 2022

FYI - I disabled the standalone DAG processor and it fixed my issues, but while enabled my scheduler would die sometimes. Here are some logs where it died almost immediately after I deployed Airflow:


| [2022-06-09 19:11:22,231] {scheduler_job.py:696} INFO - Starting the scheduler
| [2022-06-09 19:11:22,231] {scheduler_job.py:701} INFO - Processing each file at most -1 times
| [2022-06-09 19:11:22,501] {executor_loader.py:105} INFO - Loaded executor: CeleryExecutor
| [2022-06-09 19:11:22,502] {scheduler_job.py:1221} INFO - Resetting orphaned tasks for active dag runs
| [2022-06-09 19:11:22,656] {celery_executor.py:532} INFO - Adopted the following 5 tasks from a dead executor
|       <TaskInstance: cms-agent.download.extract scheduled__2022-06-09T17:33:00+00:00 [running]> in state STARTED
|       <TaskInstance: gts-received.export-gts-received scheduled__2022-06-09T16:03:00+00:00 [queued]> in state STARTED
|       <TaskInstance: cms-agent.download.extract scheduled__2022-06-09T17:33:00+00:00 [running]> in state STARTED
|       <TaskInstance: cms-split.transform scheduled__2022-06-09T17:32:00+00:00 [running]> in state STARTED
| [2022-06-09 19:11:22,777] {dag.py:2927} INFO - Setting next_dagrun for telecom-gts to 2022-06-09T19:05:00+00:00, run_after=2022-06-09T19:35:00+00:00
| [2022-06-09 19:11:27,118] {font_manager.py:1443} INFO - generated new fontManager
| [2022-06-09 19:11:31,341] {scheduler_job.py:1126} INFO - Run scheduled__2022-06-09T17:33:00+00:00 of cms-agent has timed-out
| [2022-06-09 19:11:31,347] {dag.py:2927} INFO - Setting next_dagrun for cms-agent to 2022-06-09T18:33:00+00:00, run_after=2022-06-09T19:03:00+00:00
| [2022-06-09 19:11:31,349] {scheduler_job.py:756} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
| Traceback (most recent call last):
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
|     self._run_scheduler_loop()
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
|     num_queued_tis = self._do_scheduling(session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 909, in _do_scheduling
|     callback_to_run = self._schedule_dag_run(dag_run, session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1141, in _schedule_dag_run
|     self._send_dag_callbacks_to_processor(dag, callback_to_execute)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1184, in _send_dag_callbacks_to_processor
|     self.executor.send_callback(callback)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
|     self.callback_sink.send(request)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
|     with create_session() as session:
|   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
|     next(self.gen)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
|     session.commit()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
|     self._transaction.commit(_to_root=self.future)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
|     self._prepare_impl()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
|     self.session.dispatch.before_commit(self.session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
|     fn(*args, **kw)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
|     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
| RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
| [2022-06-09 19:11:31,372] {scheduler_job.py:768} INFO - Exited execute loop
| Traceback (most recent call last):
|   File "/home/airflow/.local/bin/airflow", line 8, in <module>
|     sys.exit(main())
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/__main__.py", line 38, in main
|     args.func(args)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/cli_parser.py", line 51, in command
|     return func(*args, **kwargs)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/cli.py", line 99, in wrapper
|     return f(*args, **kwargs)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 75, in scheduler
|     _run_scheduler_job(args=args)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/cli/commands/scheduler_command.py", line 46, in _run_scheduler_job
|     job.run()
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/base_job.py", line 244, in run
|     self._execute()
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 739, in _execute
|     self._run_scheduler_loop()
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 827, in _run_scheduler_loop
|     num_queued_tis = self._do_scheduling(session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 909, in _do_scheduling
|     callback_to_run = self._schedule_dag_run(dag_run, session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1141, in _schedule_dag_run
|     self._send_dag_callbacks_to_processor(dag, callback_to_execute)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py", line 1184, in _send_dag_callbacks_to_processor
|     self.executor.send_callback(callback)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/executors/base_executor.py", line 363, in send_callback
|     self.callback_sink.send(request)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
|     with create_session() as session:
|   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
|     next(self.gen)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
|     session.commit()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
|     self._transaction.commit(_to_root=self.future)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
|     self._prepare_impl()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
|     self.session.dispatch.before_commit(self.session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
|     fn(*args, **kw)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
|     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
| RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 70, in wrapper
|     with create_session() as session:
|   File "/usr/local/lib/python3.10/contextlib.py", line 142, in __exit__
|     next(self.gen)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 33, in create_session
|     session.commit()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1423, in commit
|     self._transaction.commit(_to_root=self.future)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 829, in commit
|     self._prepare_impl()
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 797, in _prepare_impl
|     self.session.dispatch.before_commit(self.session)
|   File "/home/airflow/.local/lib/python3.10/site-packages/sqlalchemy/event/attr.py", line 320, in __call__
|     fn(*args, **kw)
|   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/sqlalchemy.py", line 268, in _validate_commit
|     raise RuntimeError("UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!")
| RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!

A pull request was already created (I discussed this on Slack), so I am not sure if an issue should be raised or not: #24366

@nicolamarangoni
Copy link
Contributor

Hello, I'm also using separated scheduler and dag-processor processes in version 2.3.2 and my scheduler is dying with
RuntimeError: UNEXPECTED COMMIT - THIS WILL BREAK HA LOCKS!
Should I avoid using separated scheduler and dag-processor until you fix this?

@potiuk
Copy link
Member

potiuk commented Jun 20, 2022

This is likely a known issue - it comes from separated dag-processor and it should get fixed in 2.3.3 with #24366 . The proposed fix is very simple (literally one line of code) so you might want to simply apply it to your installation and you will also get it when we release 2.3.3 (but you should monitor the changes to #24366) and see if we find a better/more complete fix before merging it.

@nicolamarangoni
Copy link
Contributor

I added this to my image to make it work:

RUN wget https://raw.githubusercontent.com/apache/airflow/796debe094af0659c0140bbb07b6db4d2df7ec94/airflow/jobs/scheduler_job.py
RUN rm /usr/lib/python3.10/site-packages/airflow/jobs/scheduler_job.py
RUN mv scheduler_job.py /usr/lib/python3.10/site-packages/airflow/jobs/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge kind:documentation provider:cncf-kubernetes Kubernetes provider related issues type:new-feature Changelog: New Features
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants