Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that dag_run conf is a dict #15057

Merged
merged 12 commits into from Jun 22, 2021
6 changes: 6 additions & 0 deletions airflow/www/api/experimental/endpoints.py
Expand Up @@ -88,6 +88,12 @@ def trigger_dag(dag_id):
conf = None
if 'conf' in data:
conf = data['conf']
if not isinstance(conf, dict):
error_message = 'Dag Run conf must be a dictionary object, other types are not supported'
log.error(error_message)
response = jsonify({'error': error_message})
response.status_code = 400
return response

execution_date = None
if 'execution_date' in data and data['execution_date'] is not None:
Expand Down
2 changes: 1 addition & 1 deletion airflow/www/templates/airflow/trigger.html
Expand Up @@ -35,7 +35,7 @@ <h2>Trigger DAG: {{ dag_id }}</h2>
<input type="hidden" name="dag_id" value="{{ dag_id }}">
<input type="hidden" name="origin" value="{{ origin }}">
<div class="form-group">
<label for="conf">Configuration JSON (Optional)</label>
<label for="conf">Configuration JSON (Optional, must be a dict object)</label>
<textarea class="form-control" name="conf" id="json">{{ conf }}</textarea>
</div>
<p>
Expand Down
7 changes: 6 additions & 1 deletion airflow/www/views.py
Expand Up @@ -1519,8 +1519,13 @@ def trigger(self, session=None):
if request_conf:
try:
run_conf = json.loads(request_conf)
if not isinstance(conf, dict):
flash("Invalid JSON configuration, must be a dict", "error")
return self.render_template(
'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf
)
except json.decoder.JSONDecodeError:
flash("Invalid JSON configuration", "error")
flash("Invalid JSON configuration, not parseable", "error")
return self.render_template(
'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=request_conf
)
Expand Down
20 changes: 20 additions & 0 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Expand Up @@ -945,6 +945,26 @@ def test_should_response_400_for_naive_datetime_and_bad_datetime(self, data, exp
assert response.status_code == 400
assert response.json['detail'] == expected

@parameterized.expand(
[
(
{
"dag_run_id": "TEST_DAG_RUN",
"execution_date": "2020-06-11T18:00:00+00:00",
"conf": "some string",
},
"'some string' is not of type 'object' - 'conf'",
)
]
)
def test_should_response_400_for_non_dict_dagrun_conf(self, data, expected):
self._create_dag("TEST_DAG_ID")
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns", json=data, environ_overrides={'REMOTE_USER': "test"}
)
assert response.status_code == 400
assert response.json['detail'] == expected

def test_response_404(self):
response = self.client.post(
"api/v1/dags/TEST_DAG_ID/dagRuns",
Expand Down
24 changes: 17 additions & 7 deletions tests/www/api/experimental/test_endpoints.py
Expand Up @@ -148,9 +148,25 @@ def test_dag_paused(self):
def test_trigger_dag(self):
url_template = '/api/experimental/dags/{}/dag_runs'
run_id = 'my_run' + utcnow().isoformat()

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'), data=json.dumps({}), content_type="application/json"
)
assert 404 == response.status_code

# Test error for bad conf data
response = self.client.post(
url_template.format('example_bash_operator'),
data=json.dumps({'run_id': run_id}),
data=json.dumps({'conf': 'This is a string not a dict'}),
content_type="application/json",
)
assert 400 == response.status_code

# Test OK case
response = self.client.post(
url_template.format('example_bash_operator'),
data=json.dumps({'run_id': run_id, 'conf': {'param': 'value'}}),
content_type="application/json",
)
self.assert_deprecated(response)
Expand All @@ -168,12 +184,6 @@ def test_trigger_dag(self):
assert run_id == dag_run_id
assert dag_run_id == response['run_id']

# Test error for nonexistent dag
response = self.client.post(
url_template.format('does_not_exist_dag'), data=json.dumps({}), content_type="application/json"
)
assert 404 == response.status_code

def test_trigger_dag_for_date(self):
url_template = '/api/experimental/dags/{}/dag_runs'
dag_id = 'example_bash_operator'
Expand Down
11 changes: 11 additions & 0 deletions tests/www/views/test_views_trigger_dag.py
Expand Up @@ -78,6 +78,17 @@ def test_trigger_dag_conf_malformed(admin_client):
assert run is None


def test_trigger_dag_conf_not_dict(admin_client):
test_dag_id = "example_bash_operator"

response = admin_client.post(f'trigger?dag_id={test_dag_id}', data={'conf': 'string and not a dict'})
check_content_in_response('must be a dict', response)

with create_session() as session:
run = session.query(DagRun).filter(DagRun.dag_id == test_dag_id).first()
assert run is None


def test_trigger_dag_form(admin_client):
test_dag_id = "example_bash_operator"
resp = admin_client.get(f'trigger?dag_id={test_dag_id}')
Expand Down