Skip to content

Commit

Permalink
Merge pull request #72 from Shopify/perf-improvement
Browse files Browse the repository at this point in the history
Allow specifying details flag to avoid extra REST calls
  • Loading branch information
sebmartin committed Apr 11, 2019
2 parents e33a323 + 0129272 commit a9636f8
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 57 deletions.
34 changes: 19 additions & 15 deletions pyoozie/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def _jobs_query(self, type_enum, user=None, name=None, status=None, limit=0, det
job_type, result_type = self.JOB_TYPE_STRINGS[type_enum]
filters = self._filter_string(type_enum, user=user, name=name, status=status)
offset = 1
chunk = limit if limit else 500
chunk = min(limit or 5000, 5000)
jobs = []
while True:
result = self._get('jobs?jobtype={}{}&offset={}&len={}'.format(job_type, filters, offset, chunk))
Expand All @@ -256,15 +256,17 @@ def _jobs_query(self, type_enum, user=None, name=None, status=None, limit=0, det
def jobs_all_workflows(self, name=None, user=None, limit=0):
return self._jobs_query(model.ArtifactType.Workflow, name=name, user=user, limit=limit)

def jobs_all_active_workflows(self, user=None):
return self._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.active(), user=user)
def jobs_all_active_workflows(self, user=None, details=True):
return self._jobs_query(
model.ArtifactType.Workflow, status=model.WorkflowStatus.active(), user=user, details=details)

def jobs_all_running_workflows(self, user=None):
return self._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.running(), user=user)
def jobs_all_running_workflows(self, user=None, details=True):
return self._jobs_query(
model.ArtifactType.Workflow, status=model.WorkflowStatus.running(), user=user, details=details)

def jobs_running_workflows(self, name, user=None):
def jobs_running_workflows(self, name, user=None, details=True):
return self._jobs_query(
model.ArtifactType.Workflow, name=name, status=model.WorkflowStatus.running(), user=user)
model.ArtifactType.Workflow, name=name, status=model.WorkflowStatus.running(), user=user, details=details)

def jobs_last_workflow(self, name, user=None):
jobs = self._jobs_query(model.ArtifactType.Workflow, name=name, user=user, limit=1)
Expand All @@ -273,18 +275,20 @@ def jobs_last_workflow(self, name, user=None):
else:
raise exceptions.OozieException.workflow_not_found(name)

def jobs_workflow_names(self, user=None):
jobs = self._jobs_query(model.ArtifactType.Workflow, user=user, details=False)
def jobs_workflow_names(self, user=None, limit=0):
jobs = self._jobs_query(model.ArtifactType.Workflow, user=user, details=False, limit=limit)
return set([job.appName for job in jobs])

def jobs_all_coordinators(self, name=None, user=None, limit=0):
return self._jobs_query(model.ArtifactType.Coordinator, name=name, user=user, limit=limit)
def jobs_all_coordinators(self, name=None, user=None, limit=0, details=True):
return self._jobs_query(model.ArtifactType.Coordinator, name=name, user=user, limit=limit, details=details)

def jobs_all_active_coordinators(self, user=None):
return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.active(), user=user)
def jobs_all_active_coordinators(self, user=None, details=True):
return self._jobs_query(
model.ArtifactType.Coordinator, status=model.CoordinatorStatus.active(), user=user, details=details)

def jobs_all_running_coordinators(self, user=None):
return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.running(), user=user)
def jobs_all_running_coordinators(self, user=None, details=True):
return self._jobs_query(
model.ArtifactType.Coordinator, status=model.CoordinatorStatus.running(), user=user, details=details)

def jobs_all_suspended_coordinators(self, user=None):
return self._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.suspended(), user=user)
Expand Down
112 changes: 70 additions & 42 deletions tests/pyoozie/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,27 +293,27 @@ def test_jobs_query_workflow_parameters(self, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Workflow)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, user='john_doe')
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, name='my_workflow')
mock_get.assert_called_with('jobs?jobtype=wf&filter=name=my_workflow&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=name=my_workflow&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.RUNNING)
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.running())
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING;status=SUSPENDED&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING;status=SUSPENDED&offset=1&len=5000')

api._jobs_query(
model.ArtifactType.Workflow,
user='john_doe',
name='my_workflow',
status=model.WorkflowStatus.running())
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe;name=my_workflow;status=RUNNING;'
'status=SUSPENDED&offset=1&len=500')
'status=SUSPENDED&offset=1&len=5000')

def test_jobs_query_coordinator_parameters(self, api):
mock_result = {
Expand All @@ -324,20 +324,20 @@ def test_jobs_query_coordinator_parameters(self, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Coordinator)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, user='john_doe')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, name='my_coordinator')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=name=my_coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=name=my_coordinator&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.RUNNING)
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.running())
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING;status=RUNNINGWITHERROR;'
'status=SUSPENDED;status=SUSPENDEDWITHERROR&offset=1&len=500')
'status=SUSPENDED;status=SUSPENDEDWITHERROR&offset=1&len=5000')

api._jobs_query(
model.ArtifactType.Coordinator,
Expand All @@ -346,7 +346,7 @@ def test_jobs_query_coordinator_parameters(self, api):
status=model.CoordinatorStatus.running())
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe;name=my_coordinator;'
'status=RUNNING;status=RUNNINGWITHERROR;status=SUSPENDED;'
'status=SUSPENDEDWITHERROR&offset=1&len=500')
'status=SUSPENDEDWITHERROR&offset=1&len=5000')

def test_jobs_query_bad_parameters(self, api):
with pytest.raises(KeyError) as err:
Expand All @@ -362,11 +362,11 @@ def test_jobs_query_workflow_pagination(self, _, api):
mock_results = iter(
[
{
'total': 501,
'total': 5001,
'workflows': [{'id': '1-W'}, {'id': '2-W'}]
},
{
'total': 501,
'total': 5001,
'workflows': [{'id': '3-W'}]
}
]
Expand All @@ -375,34 +375,36 @@ def test_jobs_query_workflow_pagination(self, _, api):
mock_get.side_effect = lambda url: next(mock_results)
result = api._jobs_query(model.ArtifactType.Workflow)
assert len(result) == 3
mock_get.assert_any_call('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_any_call('jobs?jobtype=wf&offset=501&len=500')
mock_get.assert_any_call('jobs?jobtype=wf&offset=1&len=5000')
mock_get.assert_any_call('jobs?jobtype=wf&offset=5001&len=5000')
with pytest.raises(StopIteration):
next(mock_results)

@pytest.mark.parametrize('limit, expected_result_count, expected_queries', [
(0, 3, ['jobs?jobtype=coordinator&offset=1&len=5000', 'jobs?jobtype=coordinator&offset=5001&len=5000']),
(2, 2, ['jobs?jobtype=coordinator&offset=1&len=2']),
(6000, 3, ['jobs?jobtype=coordinator&offset=1&len=5000', 'jobs?jobtype=coordinator&offset=5001&len=5000'])
])
@mock.patch.object(model.Coordinator, 'fill_in_details', side_effect=lambda c: c, autospec=True)
def test_jobs_query_coordinator_pagination(self, _, api):
def test_jobs_query_coordinator_pagination(self, _, limit, expected_result_count, expected_queries, api):
mock_results = iter(
[
{
'total': 501,
'total': 5001,
'coordinatorjobs': [{'coordJobId': '1-C'}, {'coordJobId': '2-C'}]
},
{
'total': 501,
'total': 5001,
'coordinatorjobs': [{'coordJobId': '3-C'}]
}
]
)

with mock.patch.object(api, '_get') as mock_get:
mock_get.side_effect = lambda url: next(mock_results)
result = api._jobs_query(model.ArtifactType.Coordinator)
assert len(result) == 3
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=501&len=500')
with pytest.raises(StopIteration):
next(mock_results)
result = api._jobs_query(model.ArtifactType.Coordinator, limit=limit)
assert len(result) == expected_result_count
mock_get.assert_has_calls(mock.call(query) for query in expected_queries)

@mock.patch.object(model.Workflow, 'fill_in_details', side_effect=lambda c: c, autospec=True)
def test_jobs_query_workflow_details(self, fill_in_details, api):
Expand All @@ -414,11 +416,11 @@ def test_jobs_query_workflow_details(self, fill_in_details, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Workflow, details=False)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')
assert not fill_in_details.called

api._jobs_query(model.ArtifactType.Workflow, details=True)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')
assert fill_in_details.called

@mock.patch.object(model.Coordinator, 'fill_in_details', side_effect=lambda c: c, autospec=True)
Expand All @@ -431,11 +433,11 @@ def test_jobs_query_coordinator_details(self, fill_in_details, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Coordinator, details=False)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')
assert not fill_in_details.called

api._jobs_query(model.ArtifactType.Coordinator, details=True)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')
assert fill_in_details.called

def test_jobs_all_workflows(self, api, sample_workflow_running):
Expand Down Expand Up @@ -463,21 +465,29 @@ def test_jobs_all_active_workflows(self, api, sample_workflow_running):
mock_query.return_value = [sample_workflow_running]

api.jobs_all_active_workflows()
mock_query.assert_called_with(model.ArtifactType.Workflow, user=None, status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Workflow, details=True, user=None, status=expected_statuses
)

api.jobs_all_active_workflows(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Workflow, user='john_doe', status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Workflow, details=True, user='john_doe', status=expected_statuses
)

def test_jobs_all_running_workflows(self, api, sample_workflow_running):
expected_statuses = model.WorkflowStatus.running()
with mock.patch.object(api, '_jobs_query') as mock_query:
mock_query.return_value = [sample_workflow_running]

api.jobs_all_running_workflows()
mock_query.assert_called_with(model.ArtifactType.Workflow, user=None, status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Workflow, details=True, user=None, status=expected_statuses
)

api.jobs_all_running_workflows(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Workflow, user='john_doe', status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Workflow, details=True, user='john_doe', status=expected_statuses
)

def test_jobs_running_workflows(self, api, sample_workflow_running):
expected_statuses = model.WorkflowStatus.running()
Expand All @@ -487,13 +497,15 @@ def test_jobs_running_workflows(self, api, sample_workflow_running):
api.jobs_running_workflows('my_workflow')
mock_query.assert_called_with(
model.ArtifactType.Workflow,
details=True,
name='my_workflow',
user=None,
status=expected_statuses)

api.jobs_running_workflows('my_workflow', user='john_doe')
mock_query.assert_called_with(
model.ArtifactType.Workflow,
details=True,
name='my_workflow',
user='john_doe',
status=expected_statuses)
Expand All @@ -513,34 +525,42 @@ def test_jobs_workflow_names_parameters(self, api):
mock_query.return_value = []

api.jobs_workflow_names()
mock_query.assert_called_with(model.ArtifactType.Workflow, user=None, details=False)
mock_query.assert_called_with(model.ArtifactType.Workflow, user=None, details=False, limit=0)

api.jobs_workflow_names(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Workflow, user='john_doe', details=False)
mock_query.assert_called_with(model.ArtifactType.Workflow, user='john_doe', details=False, limit=0)

def test_jobs_all_coordinators(self, api, sample_coordinator_running):
with mock.patch.object(api, '_jobs_query') as mock_query:
mock_query.return_value = [sample_coordinator_running]

api.jobs_all_coordinators()
mock_query.assert_called_with(model.ArtifactType.Coordinator, name=None, user=None, limit=0)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, name=None, user=None, limit=0
)

api.jobs_all_coordinators(name='my_coordinator')
mock_query.assert_called_with(model.ArtifactType.Coordinator, name='my_coordinator', user=None, limit=0)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, name='my_coordinator', user=None, limit=0
)

api.jobs_all_coordinators(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Coordinator, name=None, user='john_doe', limit=0)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, name=None, user='john_doe', limit=0
)

api.jobs_all_coordinators(name='my_coordinator', user='john_doe')
mock_query.assert_called_with(
model.ArtifactType.Coordinator,
details=True,
name='my_coordinator',
user='john_doe',
limit=0)

api.jobs_all_coordinators(name='my_coordinator', limit=1)
mock_query.assert_called_with(
model.ArtifactType.Coordinator,
details=True,
name='my_coordinator',
user=None,
limit=1)
Expand All @@ -551,21 +571,29 @@ def test_jobs_all_active_coordinators(self, api, sample_coordinator_running):
mock_query.return_value = [sample_coordinator_running]

api.jobs_all_active_coordinators()
mock_query.assert_called_with(model.ArtifactType.Coordinator, user=None, status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, user=None, status=expected_statuses
)

api.jobs_all_active_coordinators(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Coordinator, user='john_doe', status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, user='john_doe', status=expected_statuses
)

def test_jobs_all_running_coordinators(self, api, sample_coordinator_running):
expected_statuses = model.CoordinatorStatus.running()
with mock.patch.object(api, '_jobs_query') as mock_query:
mock_query.return_value = [sample_coordinator_running]

api.jobs_all_running_coordinators()
mock_query.assert_called_with(model.ArtifactType.Coordinator, user=None, status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, user=None, status=expected_statuses
)

api.jobs_all_running_coordinators(user='john_doe')
mock_query.assert_called_with(model.ArtifactType.Coordinator, user='john_doe', status=expected_statuses)
mock_query.assert_called_with(
model.ArtifactType.Coordinator, details=True, user='john_doe', status=expected_statuses
)

def test_jobs_all_suspended_coordinators(self, api, sample_coordinator_suspended):
expected_statuses = model.CoordinatorStatus.suspended()
Expand Down

0 comments on commit a9636f8

Please sign in to comment.