Skip to content

Commit

Permalink
Parse job_run() payload properly (apache#31)
Browse files Browse the repository at this point in the history
  • Loading branch information
roaraya8 committed Jun 3, 2019
1 parent f9748aa commit 739f676
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
15 changes: 9 additions & 6 deletions marquez_airflow/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ def report_jobrun(self, run_args, execution_date):
nominal_start_time=start_time,
nominal_end_time=end_time)

marquez_jobrun_id = str(marquez_jobrun.run_id)

marquez_client.mark_job_run_as_running(marquez_jobrun_id)
self.log_marquez_event(['job_running',
marquez_jobrun_id,
start_time])
marquez_jobrun_id = marquez_jobrun.get('runId')
if marquez_jobrun_id:
marquez_client.mark_job_run_as_running(marquez_jobrun_id)
self.log_marquez_event(['job_running',
marquez_jobrun_id,
start_time])
else:
logging.warning("[Marquez]\tNo 'runId' in payload: {}".format(
marquez_jobrun))
return marquez_jobrun_id

def compute_endtime(self, execution_date):
Expand Down
7 changes: 3 additions & 4 deletions tests/test_dag_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import pendulum
import pytest
import sys


class MockDag:
Expand Down Expand Up @@ -142,10 +143,8 @@ def assert_marquez_calls_for_dagrun(test_dag,


def make_mock_marquez_client(run_id):
mock_marquez_jobrun = Mock()
mock_marquez_jobrun.run_id = run_id
mock_marquez_client = create_autospec(MarquezClient)
mock_marquez_client.create_job_run.return_value = mock_marquez_jobrun
mock_marquez_client.create_job_run.return_value = {'runId': run_id}
return mock_marquez_client


Expand All @@ -157,4 +156,4 @@ def make_mock_airflow_jobrun(dag_id, airflow_run_id):


if __name__ == "__main__":
pytest.main()
pytest.main([sys.argv[0]])

0 comments on commit 739f676

Please sign in to comment.