From 5e21e5c6ccd23745da3f1933cec3a3ca7333263c Mon Sep 17 00:00:00 2001 From: Kevin Crouse Date: Mon, 11 Jul 2022 12:13:16 -0400 Subject: [PATCH] Fix pid check (#24636) (cherry picked from commit 26c9768e44315e08c298776e0d7fb400b442bf96) --- airflow/jobs/local_task_job.py | 8 +++++++- tests/jobs/test_local_task_job.py | 30 ++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/airflow/jobs/local_task_job.py b/airflow/jobs/local_task_job.py index 9b2c3510ebfd0..139d8391cba41 100644 --- a/airflow/jobs/local_task_job.py +++ b/airflow/jobs/local_task_job.py @@ -200,7 +200,13 @@ def heartbeat_callback(self, session=None): recorded_pid = ti.pid same_process = recorded_pid == current_pid - if ti.run_as_user or self.task_runner.run_as_user: + if recorded_pid is not None and (ti.run_as_user or self.task_runner.run_as_user): + # when running as another user, compare the task runner pid to the parent of + # the recorded pid because user delegation becomes an extra process level. + # However, if recorded_pid is None, pass that through as it signals the task + # runner process has already completed and been cleared out. `psutil.Process` + # uses the current process if the parameter is None, which is not what is intended + # for comparison. recorded_pid = psutil.Process(ti.pid).ppid() same_process = recorded_pid == current_pid diff --git a/tests/jobs/test_local_task_job.py b/tests/jobs/test_local_task_job.py index bad4c3c91e5dc..0d6a2c51ca8e3 100644 --- a/tests/jobs/test_local_task_job.py +++ b/tests/jobs/test_local_task_job.py @@ -152,6 +152,16 @@ def test_localtaskjob_heartbeat(self, dag_maker): with pytest.raises(AirflowException): job1.heartbeat_callback() + # Now, set the ti.pid to None and test that no error + # is raised. + ti.pid = None + session.merge(ti) + session.commit() + assert ti.pid != job1.task_runner.process.pid + assert not ti.run_as_user + assert not job1.task_runner.run_as_user + job1.heartbeat_callback() + @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker): @@ -194,6 +204,16 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker with pytest.raises(AirflowException, match='PID of job runner does not match'): job1.heartbeat_callback() + # Here we set the ti.pid to None and test that no error is + # raised + ti.pid = None + session.merge(ti) + session.commit() + assert ti.run_as_user + assert job1.task_runner.run_as_user == ti.run_as_user + assert ti.pid != job1.task_runner.process.pid + job1.heartbeat_callback() + @conf_vars({('core', 'default_impersonation'): 'testuser'}) @mock.patch('subprocess.check_call') @mock.patch('airflow.jobs.local_task_job.psutil') @@ -237,6 +257,16 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _, with pytest.raises(AirflowException, match='PID of job runner does not match'): job1.heartbeat_callback() + # Now, set the ti.pid to None and test that no error + # is raised. + ti.pid = None + session.merge(ti) + session.commit() + assert job1.task_runner.run_as_user == 'testuser' + assert ti.run_as_user is None + assert ti.pid != job1.task_runner.process.pid + job1.heartbeat_callback() + def test_heartbeat_failed_fast(self): """ Test that task heartbeat will sleep when it fails fast