Skip to content

Commit

Permalink
adding in tests for when the ti.pid is None
Browse files Browse the repository at this point in the history
  • Loading branch information
krcrouse authored and potiuk committed Jul 7, 2022
1 parent d4a44bd commit 2ba30e3
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions tests/jobs/test_local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ def test_localtaskjob_heartbeat(self, dag_maker):
with pytest.raises(AirflowException):
job1.heartbeat_callback()

# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
session.merge(ti)
session.commit()
assert ti.pid != job1.task_runner.process.pid
assert not ti.run_as_user
assert not job1.task_runner.run_as_user
job1.heartbeat_callback()

@mock.patch('subprocess.check_call')
@mock.patch('airflow.jobs.local_task_job.psutil')
def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker):
Expand Down Expand Up @@ -196,6 +206,16 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

# Here we set the ti.pid to None and test that no error is
# raised
ti.pid = None
session.merge(ti)
session.commit()
assert ti.run_as_user
assert job1.task_runner.run_as_user == ti.run_as_user
assert ti.pid != job1.task_runner.process.pid
job1.heartbeat_callback()

@conf_vars({('core', 'default_impersonation'): 'testuser'})
@mock.patch('subprocess.check_call')
@mock.patch('airflow.jobs.local_task_job.psutil')
Expand Down Expand Up @@ -239,6 +259,16 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _,
with pytest.raises(AirflowException, match='PID of job runner does not match'):
job1.heartbeat_callback()

# Now, set the ti.pid to None and test that no error
# is raised.
ti.pid = None
session.merge(ti)
session.commit()
assert job1.task_runner.run_as_user == 'testuser'
assert ti.run_as_user is None
assert ti.pid != job1.task_runner.process.pid
job1.heartbeat_callback()

def test_heartbeat_failed_fast(self):
"""
Test that task heartbeat will sleep when it fails fast
Expand Down

0 comments on commit 2ba30e3

Please sign in to comment.