Skip to content

Commit

Permalink
change batch size to 5000
Browse files Browse the repository at this point in the history
  • Loading branch information
Seb Martin committed Apr 9, 2019
1 parent 4196b38 commit ff74a46
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pyoozie/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def _jobs_query(self, type_enum, user=None, name=None, status=None, limit=0, det
job_type, result_type = self.JOB_TYPE_STRINGS[type_enum]
filters = self._filter_string(type_enum, user=user, name=name, status=status)
offset = 1
chunk = limit if limit else 500
chunk = min(limit or 5000, 5000)
jobs = []
while True:
result = self._get('jobs?jobtype={}{}&offset={}&len={}'.format(job_type, filters, offset, chunk))
Expand Down
79 changes: 55 additions & 24 deletions tests/pyoozie/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,27 +293,27 @@ def test_jobs_query_workflow_parameters(self, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Workflow)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, user='john_doe')
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, name='my_workflow')
mock_get.assert_called_with('jobs?jobtype=wf&filter=name=my_workflow&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=name=my_workflow&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.RUNNING)
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Workflow, status=model.WorkflowStatus.running())
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING;status=SUSPENDED&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&filter=status=RUNNING;status=SUSPENDED&offset=1&len=5000')

api._jobs_query(
model.ArtifactType.Workflow,
user='john_doe',
name='my_workflow',
status=model.WorkflowStatus.running())
mock_get.assert_called_with('jobs?jobtype=wf&filter=user=john_doe;name=my_workflow;status=RUNNING;'
'status=SUSPENDED&offset=1&len=500')
'status=SUSPENDED&offset=1&len=5000')

def test_jobs_query_coordinator_parameters(self, api):
mock_result = {
Expand All @@ -324,20 +324,20 @@ def test_jobs_query_coordinator_parameters(self, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Coordinator)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, user='john_doe')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, name='my_coordinator')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=name=my_coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=name=my_coordinator&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.RUNNING)
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING&offset=1&len=5000')

api._jobs_query(model.ArtifactType.Coordinator, status=model.CoordinatorStatus.running())
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=status=RUNNING;status=RUNNINGWITHERROR;'
'status=SUSPENDED;status=SUSPENDEDWITHERROR&offset=1&len=500')
'status=SUSPENDED;status=SUSPENDEDWITHERROR&offset=1&len=5000')

api._jobs_query(
model.ArtifactType.Coordinator,
Expand All @@ -346,7 +346,7 @@ def test_jobs_query_coordinator_parameters(self, api):
status=model.CoordinatorStatus.running())
mock_get.assert_called_with('jobs?jobtype=coordinator&filter=user=john_doe;name=my_coordinator;'
'status=RUNNING;status=RUNNINGWITHERROR;status=SUSPENDED;'
'status=SUSPENDEDWITHERROR&offset=1&len=500')
'status=SUSPENDEDWITHERROR&offset=1&len=5000')

def test_jobs_query_bad_parameters(self, api):
with pytest.raises(KeyError) as err:
Expand All @@ -362,11 +362,11 @@ def test_jobs_query_workflow_pagination(self, _, api):
mock_results = iter(
[
{
'total': 501,
'total': 5001,
'workflows': [{'id': '1-W'}, {'id': '2-W'}]
},
{
'total': 501,
'total': 5001,
'workflows': [{'id': '3-W'}]
}
]
Expand All @@ -375,8 +375,8 @@ def test_jobs_query_workflow_pagination(self, _, api):
mock_get.side_effect = lambda url: next(mock_results)
result = api._jobs_query(model.ArtifactType.Workflow)
assert len(result) == 3
mock_get.assert_any_call('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_any_call('jobs?jobtype=wf&offset=501&len=500')
mock_get.assert_any_call('jobs?jobtype=wf&offset=1&len=5000')
mock_get.assert_any_call('jobs?jobtype=wf&offset=5001&len=5000')
with pytest.raises(StopIteration):
next(mock_results)

Expand All @@ -385,11 +385,11 @@ def test_jobs_query_coordinator_pagination(self, _, api):
mock_results = iter(
[
{
'total': 501,
'total': 5001,
'coordinatorjobs': [{'coordJobId': '1-C'}, {'coordJobId': '2-C'}]
},
{
'total': 501,
'total': 5001,
'coordinatorjobs': [{'coordJobId': '3-C'}]
}
]
Expand All @@ -399,11 +399,42 @@ def test_jobs_query_coordinator_pagination(self, _, api):
mock_get.side_effect = lambda url: next(mock_results)
result = api._jobs_query(model.ArtifactType.Coordinator)
assert len(result) == 3
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=501&len=500')
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=1&len=5000')
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=5001&len=5000')
with pytest.raises(StopIteration):
next(mock_results)

@mock.patch.object(model.Coordinator, 'fill_in_details', side_effect=lambda c: c, autospec=True)
def test_jobs_query_coordinator_limit(self, _, api):
# mock_result = {'total': 1, 'coordinatorjobs': [{'coordJobId': '3-C'}]}
mock_results = iter(
[
{
'total': 2,
'coordinatorjobs': [{'coordJobId': '1-C'}, {'coordJobId': '2-C'}]
},
{
'total': 5001,
'coordinatorjobs': [{'coordJobId': '1-C'}, {'coordJobId': '2-C'}]
},
{
'total': 5001,
'coordinatorjobs': [{'coordJobId': '3-C'}]
}
]
)

with mock.patch.object(api, '_get') as mock_get:
mock_get.side_effect = lambda url: next(mock_results)
api._jobs_query(model.ArtifactType.Coordinator, limit=5)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5')
api._jobs_query(model.ArtifactType.Coordinator, limit=6000)
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=1&len=5000')
mock_get.assert_any_call('jobs?jobtype=coordinator&offset=5001&len=5000')
with pytest.raises(StopIteration):
next(mock_results)


@mock.patch.object(model.Workflow, 'fill_in_details', side_effect=lambda c: c, autospec=True)
def test_jobs_query_workflow_details(self, fill_in_details, api):
mock_result = {
Expand All @@ -414,11 +445,11 @@ def test_jobs_query_workflow_details(self, fill_in_details, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Workflow, details=False)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')
assert not fill_in_details.called

api._jobs_query(model.ArtifactType.Workflow, details=True)
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=wf&offset=1&len=5000')
assert fill_in_details.called

@mock.patch.object(model.Coordinator, 'fill_in_details', side_effect=lambda c: c, autospec=True)
Expand All @@ -431,11 +462,11 @@ def test_jobs_query_coordinator_details(self, fill_in_details, api):
mock_get.return_value = mock_result

api._jobs_query(model.ArtifactType.Coordinator, details=False)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')
assert not fill_in_details.called

api._jobs_query(model.ArtifactType.Coordinator, details=True)
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=500')
mock_get.assert_called_with('jobs?jobtype=coordinator&offset=1&len=5000')
assert fill_in_details.called

def test_jobs_all_workflows(self, api, sample_workflow_running):
Expand Down

0 comments on commit ff74a46

Please sign in to comment.