Skip to content

Commit

Permalink
Fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mhenc committed Mar 8, 2022
1 parent c970cdc commit 33b1d20
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 9 deletions.
2 changes: 1 addition & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ def _run_parsing_loop(self):
loop_start_time = time.monotonic()
ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
if self._signal_conn in ready:
if not conf.getboolean("scheduler", "standalone_dag_processor"):
if conf.getboolean("scheduler", "standalone_dag_processor"):
# Nothing to do if callbacks are not stored in the database
self._fetch_callbacks(maxCallbacksPerLoop)
agent_signal = self._signal_conn.recv()
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/local_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def queue_task_instance(
cfg_path: Optional[str] = None,
) -> None:
"""Queues task instance via local or kubernetes executor"""
executor = self._router(SimpleTaskInstance(task_instance))
executor = self._router(SimpleTaskInstance.from_ti(task_instance))
self.log.debug(
"Using executor: %s to queue_task_instance for %s", executor.__class__.__name__, task_instance.key
)
Expand Down
26 changes: 19 additions & 7 deletions tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,12 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmpdir):
# assert code not deleted
assert DagCode.has_dag(dag.fileloc)

@conf_vars({('scheduler', 'standalone_dag_processor'): 'True'})
@conf_vars({('core', 'load_examples'): 'False'})
@conf_vars(
{
('core', 'load_examples'): 'False',
('scheduler', 'standalone_dag_processor'): 'True',
}
)
def test_fetch_callbacks_from_database(self, tmpdir):
"""Test DagFileProcessorManager._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
Expand Down Expand Up @@ -713,9 +717,13 @@ def test_fetch_callbacks_from_database(self, tmpdir):
with create_session() as session:
assert session.query(DbCallbackRequest).count() == 0

@conf_vars({('scheduler', 'standalone_dag_processor'): 'True'})
@conf_vars({('scheduler', 'max_callbacks_per_loop'): '2'})
@conf_vars({('core', 'load_examples'): 'False'})
@conf_vars(
{
('scheduler', 'standalone_dag_processor'): 'True',
('scheduler', 'max_callbacks_per_loop'): '2',
('core', 'load_examples'): 'False',
}
)
def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
"""Test DagFileProcessorManager._fetch_callbacks method"""
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
Expand Down Expand Up @@ -751,8 +759,12 @@ def test_fetch_callbacks_from_database_max_per_loop(self, tmpdir):
assert (len(results)) == 2
assert session.query(DbCallbackRequest).count() == 1

@conf_vars({('scheduler', 'standalone_dag_processor'): 'False'})
@conf_vars({('core', 'load_examples'): 'False'})
@conf_vars(
{
('scheduler', 'standalone_dag_processor'): 'False',
('core', 'load_examples'): 'False',
}
)
def test_fetch_callbacks_from_database_not_standalone(self, tmpdir):
dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"

Expand Down

0 comments on commit 33b1d20

Please sign in to comment.