forked from apache/airflow
-
Notifications
You must be signed in to change notification settings - Fork 0
/
integration_test.py
81 lines (61 loc) · 2.43 KB
/
integration_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from marquez_client import MarquezClient
import logging
import os
import pytest
import requests
import subprocess
import sys
from urllib3.util.retry import Retry
def test_data_in_marquez(wait_for_marquez, init_airflow_db):
dag_id = "test_dag_v2"
execution_date = "2019-02-01T00:00:00"
namespace = "integration-test"
c = MarquezClient(namespace_name=namespace)
assert(trigger_dag(dag_id, execution_date))
assert(check_dag_state(dag_id, execution_date))
result = c.get_namespace(namespace)
assert(result and result['name'] == namespace)
expected_job = "test_dag_v2"
result = c.get_job(expected_job)
assert(result and result['name'] == expected_job)
def trigger_dag(dag_id, execution_date):
process = airflow_cli(['backfill', dag_id, '-s', execution_date])
return not process.returncode
def check_dag_state(dag_id, execution_date):
process = airflow_cli(['dag_state', dag_id, execution_date])
dag_state = process.stdout.decode('utf8').strip().split('\n')[-1]
return not process.returncode and dag_state == 'success'
def airflow_cli(args):
cmd = ['airflow'] + args
process = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if process.stderr:
logging.error(process.stderr)
return process
@pytest.fixture(scope="module")
def init_airflow_db():
process = airflow_cli(['initdb'])
return not process.returncode
@pytest.fixture(scope="module")
def wait_for_marquez():
url = 'http://{}:{}/ping'.format(os.environ['MARQUEZ_HOST'],
os.environ['MARQUEZ_PORT'])
session = requests.Session()
retry = Retry(total=5, backoff_factor=0.5)
adapter = requests.adapters.HTTPAdapter(max_retries=retry)
session.mount('http://', adapter)
session.get(url)
if __name__ == "__main__":
pytest.main([sys.argv[0]])