From 463913b8fef76843e4c2e1bd4c994a2295dd092c Mon Sep 17 00:00:00 2001 From: Zack Krida Date: Fri, 8 Oct 2021 23:10:15 -0400 Subject: [PATCH 1/4] init --- .../dags/airflow_log_cleanup_workflow.py | 6 +----- ...eck_new_smithsonian_unit_codes_workflow.py | 6 +----- openverse_catalog/dags/cleaner_workflow.py | 6 ++---- .../dags/common_api_workflows.py | 7 ++----- openverse_catalog/dags/commoncrawl_etl.py | 10 ++-------- .../europeana_sub_provider_update_workflow.py | 6 +----- .../flickr_sub_provider_update_workflow.py | 6 +----- .../dags/image_expiration_workflow.py | 6 +----- .../recreate_audio_popularity_calculation.py | 7 +------ .../recreate_image_popularity_calculation.py | 7 +------ .../dags/refresh_all_audio_popularity_data.py | 11 +--------- .../dags/refresh_all_image_popularity_data.py | 11 +--------- .../dags/refresh_audio_view_data.py | 6 +----- .../dags/refresh_image_view_data.py | 6 +----- ...mithsonian_sub_provider_update_workflow.py | 6 +----- .../dags/sync_commoncrawl_workflow.py | 14 ++----------- openverse_catalog/dags/util/dag_factory.py | 11 +++------- openverse_catalog/dags/util/etl/operators.py | 9 --------- openverse_catalog/dags/util/operator_util.py | 8 -------- tests/dags/test_common_api_workflows.py | 11 +--------- tests/dags/util/test_dag_factory.py | 20 +------------------ 21 files changed, 25 insertions(+), 155 deletions(-) diff --git a/openverse_catalog/dags/airflow_log_cleanup_workflow.py b/openverse_catalog/dags/airflow_log_cleanup_workflow.py index 0dccdd8094..feb9539479 100644 --- a/openverse_catalog/dags/airflow_log_cleanup_workflow.py +++ b/openverse_catalog/dags/airflow_log_cleanup_workflow.py @@ -19,7 +19,6 @@ from datetime import datetime, timedelta import jinja2 -import util.operator_util as ops from airflow.configuration import conf from airflow.models import DAG from airflow.operators.python import PythonOperator @@ -88,14 +87,11 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task = get_log_cleaner_operator( dag, BASE_LOG_FOLDER, ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py index 0443408829..323bd5e200 100644 --- a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py +++ b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,11 +50,8 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task = operators.get_smithsonian_unit_code_operator(dag, postgres_conn_id) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/cleaner_workflow.py b/openverse_catalog/dags/cleaner_workflow.py index 55267f6979..474b89b228 100644 --- a/openverse_catalog/dags/cleaner_workflow.py +++ b/openverse_catalog/dags/cleaner_workflow.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from util import config, operator_util, pg_cleaner +from util import config, pg_cleaner logging.basicConfig( @@ -47,9 +47,7 @@ def create_id_partitioned_cleaner_dag( _get_pg_cleaner_operator(dag, prefix, postgres_conn_id) for prefix in hex_prefixes ] - start_task = operator_util.get_log_operator(dag, dag.dag_id, "Started") - end_task = operator_util.get_log_operator(dag, dag.dag_id, "Ended") - start_task >> cleaner_list >> end_task + cleaner_list return dag diff --git a/openverse_catalog/dags/common_api_workflows.py b/openverse_catalog/dags/common_api_workflows.py index b6c49b445a..22e6781602 100644 --- a/openverse_catalog/dags/common_api_workflows.py +++ b/openverse_catalog/dags/common_api_workflows.py @@ -4,7 +4,7 @@ import util.config as conf from airflow import DAG from croniter import croniter -from util.operator_util import get_log_operator, get_runner_operator +from util.operator_util import get_runner_operator logging.basicConfig( @@ -52,11 +52,8 @@ def create_dag( ) with dag: - start_task = get_log_operator(dag, source, "starting") run_task = get_runner_operator(dag, source, script_location) - end_task = get_log_operator(dag, source, "finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/commoncrawl_etl.py b/openverse_catalog/dags/commoncrawl_etl.py index 0631154a75..ae35f0065e 100644 --- a/openverse_catalog/dags/commoncrawl_etl.py +++ b/openverse_catalog/dags/commoncrawl_etl.py @@ -165,9 +165,6 @@ concurrency=1, catchup=False, ) as dag: - - job_start_logger = operators.get_log_operator(dag, "Starting") - check_for_cc_index = operators.get_check_cc_index_in_s3_sensor( dag, AWS_CONN_ID, @@ -207,15 +204,12 @@ AWS_CONN_ID, ) - job_done_logger = operators.get_log_operator(dag, "Finished") - ( - job_start_logger - >> check_for_cc_index + check_for_cc_index >> check_for_wat_file >> [extract_script_loader, cluster_bootstrap_loader] >> job_flow_creator >> job_sensor >> job_flow_terminator ) - [job_flow_creator, job_sensor, job_flow_terminator] >> job_done_logger + [job_flow_creator, job_sensor, job_flow_terminator] diff --git a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py index d7b7516d4c..e15c6f2fc8 100644 --- a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,13 +50,10 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task = operators.get_europeana_sub_provider_update_operator( dag, postgres_conn_id ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py index 86d592b9ac..fcb887d403 100644 --- a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,13 +50,10 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task = operators.get_flickr_sub_provider_update_operator( dag, postgres_conn_id ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/image_expiration_workflow.py b/openverse_catalog/dags/image_expiration_workflow.py index 88c3765956..8276c826cc 100644 --- a/openverse_catalog/dags/image_expiration_workflow.py +++ b/openverse_catalog/dags/image_expiration_workflow.py @@ -7,7 +7,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators, sql @@ -50,14 +49,11 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task_list = [ operators.get_image_expiration_operator(dag, postgres_conn_id, provider) for provider in sql.OLDEST_PER_PROVIDER ] - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task_list >> end_task + run_task_list return dag diff --git a/openverse_catalog/dags/recreate_audio_popularity_calculation.py b/openverse_catalog/dags/recreate_audio_popularity_calculation.py index 66e1813ef7..62bde8b200 100644 --- a/openverse_catalog/dags/recreate_audio_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_audio_popularity_calculation.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -54,7 +53,6 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") drop_relations = operators.drop_media_popularity_relations( dag, postgres_conn_id, @@ -95,17 +93,14 @@ def create_dag( postgres_conn_id, "audio", ) - end_task = get_log_operator(dag, DAG_ID, "Finished") ( - start_task - >> [drop_relations, drop_functions] + [drop_relations, drop_functions] >> create_metrics >> [update_metrics, create_percentile] >> create_constants >> create_popularity >> create_db_view - >> end_task ) return dag diff --git a/openverse_catalog/dags/recreate_image_popularity_calculation.py b/openverse_catalog/dags/recreate_image_popularity_calculation.py index 48e1db0b17..daae19bd34 100644 --- a/openverse_catalog/dags/recreate_image_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_image_popularity_calculation.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -54,7 +53,6 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") drop_relations = operators.drop_media_popularity_relations( dag, postgres_conn_id, @@ -79,17 +77,14 @@ def create_dag( dag, postgres_conn_id ) create_image_view = operators.create_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") ( - start_task - >> [drop_relations, drop_functions] + [drop_relations, drop_functions] >> create_metrics >> [update_metrics, create_percentile] >> create_constants >> create_popularity >> create_image_view - >> end_task ) return dag diff --git a/openverse_catalog/dags/refresh_all_audio_popularity_data.py b/openverse_catalog/dags/refresh_all_audio_popularity_data.py index c1477f874b..dd54107ad0 100644 --- a/openverse_catalog/dags/refresh_all_audio_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_audio_popularity_data.py @@ -11,7 +11,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -53,7 +52,6 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") update_metrics = operators.update_media_popularity_metrics( dag, postgres_conn_id, @@ -69,15 +67,8 @@ def create_dag( postgres_conn_id, media_type="audio", ) - end_task = get_log_operator(dag, DAG_ID, "Finished") - ( - start_task - >> update_metrics - >> update_constants - >> update_image_view - >> end_task - ) + (update_metrics >> update_constants >> update_image_view) return dag diff --git a/openverse_catalog/dags/refresh_all_image_popularity_data.py b/openverse_catalog/dags/refresh_all_image_popularity_data.py index 22cce4c7a9..45ccdf45b0 100644 --- a/openverse_catalog/dags/refresh_all_image_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_image_popularity_data.py @@ -11,7 +11,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -53,7 +52,6 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") update_metrics = operators.update_media_popularity_metrics( dag, postgres_conn_id ) @@ -61,15 +59,8 @@ def create_dag( dag, postgres_conn_id ) update_image_view = operators.update_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") - ( - start_task - >> update_metrics - >> update_constants - >> update_image_view - >> end_task - ) + (update_metrics >> update_constants >> update_image_view) return dag diff --git a/openverse_catalog/dags/refresh_audio_view_data.py b/openverse_catalog/dags/refresh_audio_view_data.py index 871552266a..353f080e41 100644 --- a/openverse_catalog/dags/refresh_audio_view_data.py +++ b/openverse_catalog/dags/refresh_audio_view_data.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -56,13 +55,10 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") update_audio_view = operators.update_db_view( dag, postgres_conn_id, media_type="audio" ) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - start_task >> update_audio_view >> end_task + update_audio_view return dag diff --git a/openverse_catalog/dags/refresh_image_view_data.py b/openverse_catalog/dags/refresh_image_view_data.py index af0285e236..999c7a717d 100644 --- a/openverse_catalog/dags/refresh_image_view_data.py +++ b/openverse_catalog/dags/refresh_image_view_data.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -56,11 +55,8 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") update_image_view = operators.update_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - start_task >> update_image_view >> end_task + update_image_view return dag diff --git a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py index edc713b0dd..14bf05e860 100644 --- a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py @@ -7,7 +7,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -50,13 +49,10 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") run_task = operators.get_smithsonian_sub_provider_update_operator( dag, postgres_conn_id ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + run_task return dag diff --git a/openverse_catalog/dags/sync_commoncrawl_workflow.py b/openverse_catalog/dags/sync_commoncrawl_workflow.py index 2049f630c6..8003263976 100644 --- a/openverse_catalog/dags/sync_commoncrawl_workflow.py +++ b/openverse_catalog/dags/sync_commoncrawl_workflow.py @@ -4,7 +4,6 @@ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator -from util.operator_util import get_log_operator from util.tsv_cleaner import clean_tsv_directory @@ -91,21 +90,12 @@ def create_dag(): ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") create_dir_task = get_creator_operator(dag) sync_tsvs_task = get_syncer_operator(dag) clean_tsvs_task = get_cleaner_operator(dag) empty_dir_task = get_deleter_operator(dag) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - ( - start_task - >> create_dir_task - >> sync_tsvs_task - >> clean_tsvs_task - >> empty_dir_task - >> end_task - ) + + (create_dir_task >> sync_tsvs_task >> clean_tsvs_task >> empty_dir_task) return dag diff --git a/openverse_catalog/dags/util/dag_factory.py b/openverse_catalog/dags/util/dag_factory.py index 18c2a5ce05..fbfabc8bf5 100644 --- a/openverse_catalog/dags/util/dag_factory.py +++ b/openverse_catalog/dags/util/dag_factory.py @@ -73,17 +73,13 @@ def create_provider_api_workflow( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") if dated: run_task = ops.get_dated_main_runner_operator( dag, main_function, dagrun_timeout, day_shift=day_shift ) else: run_task = ops.get_main_runner_operator(dag, main_function) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task - + run_task return dag @@ -186,12 +182,11 @@ def create_day_partitioned_ingestion_dag( ingest_operator_list_list = _build_ingest_operator_list_list( reingestion_day_list_list, dag, main_function, ingestion_task_timeout ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") for i in range(len(ingest_operator_list_list) - 1): wait_operator = ops.get_wait_till_done_operator(dag, f"wait_L{i}") - cross_downstream(ingest_operator_list_list[i], [wait_operator, end_task]) + cross_downstream(ingest_operator_list_list[i], [wait_operator]) wait_operator >> ingest_operator_list_list[i + 1] - ingest_operator_list_list[-1] >> end_task + ingest_operator_list_list[-1] return dag diff --git a/openverse_catalog/dags/util/etl/operators.py b/openverse_catalog/dags/util/etl/operators.py index 725b7eb13d..ddee529477 100644 --- a/openverse_catalog/dags/util/etl/operators.py +++ b/openverse_catalog/dags/util/etl/operators.py @@ -1,4 +1,3 @@ -from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.operators.emr_create_job_flow import ( @@ -13,14 +12,6 @@ from airflow.utils.trigger_rule import TriggerRule -def get_log_operator(dag, status): - return BashOperator( - task_id=f"log_{dag.dag_id}_{status}", - bash_command=f"echo {status} {dag.dag_id} at $(date)", - trigger_rule=TriggerRule.ALL_SUCCESS, - ) - - def get_check_cc_index_in_s3_sensor(dag, aws_conn_id): return S3PrefixSensor( task_id="check_for_cc_index", diff --git a/openverse_catalog/dags/util/operator_util.py b/openverse_catalog/dags/util/operator_util.py index f8f32b57c5..69eb31459f 100644 --- a/openverse_catalog/dags/util/operator_util.py +++ b/openverse_catalog/dags/util/operator_util.py @@ -39,13 +39,5 @@ def get_main_runner_operator(dag, main_function): ) -def get_log_operator(dag, source, status): - return BashOperator( - task_id=f"{source}_{status}", - bash_command=f"echo {status} {source} workflow at $(date)", - dag=dag, - ) - - def get_wait_till_done_operator(dag, task_id): return DummyOperator(task_id=task_id, trigger_rule=TriggerRule.ALL_DONE, dag=dag) diff --git a/tests/dags/test_common_api_workflows.py b/tests/dags/test_common_api_workflows.py index 0a06fc1480..fb1cef041f 100644 --- a/tests/dags/test_common_api_workflows.py +++ b/tests/dags/test_common_api_workflows.py @@ -19,18 +19,9 @@ def test_dags_load_with_no_errors(tmpdir): def test_create_dag_creates_correct_dependencies(): dag = caw.create_dag("test_source", "test_script_location", "test_dag_id") - start_id = "test_source_starting" run_id = "get_test_source_images" - finish_id = "test_source_finished" - start_task = dag.get_task(start_id) - assert start_task.upstream_task_ids == set() - assert start_task.downstream_task_ids == set([run_id]) run_task = dag.get_task(run_id) - assert run_task.upstream_task_ids == set([start_id]) - assert run_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set([run_id]) - assert finish_task.downstream_task_ids == set() + assert run_task.downstream_task_ids == set() def test_create_dag_adds_schedule_interval(): diff --git a/tests/dags/util/test_dag_factory.py b/tests/dags/util/test_dag_factory.py index 50b29ae362..10ecf79e3f 100644 --- a/tests/dags/util/test_dag_factory.py +++ b/tests/dags/util/test_dag_factory.py @@ -11,19 +11,13 @@ def test_create_day_partitioned_ingestion_dag_with_single_layer_dependencies(): wait0_id = "wait_L0" ingest1_id = "ingest_1" ingest2_id = "ingest_2" - finish_id = "test_dag_Finished" today_task = dag.get_task(today_id) assert today_task.upstream_task_ids == set() - assert today_task.downstream_task_ids == set([wait0_id, finish_id]) + assert today_task.downstream_task_ids == set([wait0_id]) ingest1_task = dag.get_task(ingest1_id) assert ingest1_task.upstream_task_ids == set([wait0_id]) - assert ingest1_task.downstream_task_ids == set([finish_id]) ingest2_task = dag.get_task(ingest2_id) assert ingest2_task.upstream_task_ids == set([wait0_id]) - assert ingest2_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set([today_id, ingest1_id, ingest2_id]) - assert finish_task.downstream_task_ids == set() def test_create_day_partitioned_ingestion_dag_with_multi_layer_dependencies(): @@ -40,27 +34,15 @@ def test_create_day_partitioned_ingestion_dag_with_multi_layer_dependencies(): ingest3_id = "ingest_3" ingest4_id = "ingest_4" ingest5_id = "ingest_5" - finish_id = "test_dag_Finished" today_task = dag.get_task(today_id) assert today_task.upstream_task_ids == set() - assert today_task.downstream_task_ids == set([wait0_id, finish_id]) ingest1_task = dag.get_task(ingest1_id) assert ingest1_task.upstream_task_ids == set([wait0_id]) - assert ingest1_task.downstream_task_ids == set([wait1_id, finish_id]) ingest2_task = dag.get_task(ingest2_id) assert ingest2_task.upstream_task_ids == set([wait0_id]) - assert ingest2_task.downstream_task_ids == set([wait1_id, finish_id]) ingest3_task = dag.get_task(ingest3_id) assert ingest3_task.upstream_task_ids == set([wait1_id]) - assert ingest3_task.downstream_task_ids == set([finish_id]) ingest4_task = dag.get_task(ingest4_id) assert ingest4_task.upstream_task_ids == set([wait1_id]) - assert ingest4_task.downstream_task_ids == set([finish_id]) ingest5_task = dag.get_task(ingest5_id) assert ingest5_task.upstream_task_ids == set([wait1_id]) - assert ingest5_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set( - [today_id, ingest1_id, ingest2_id, ingest3_id, ingest4_id, ingest5_id] - ) - assert finish_task.downstream_task_ids == set() From 7f320c235244ad094233763816b6fccd80ea0462 Mon Sep 17 00:00:00 2001 From: Zack Krida Date: Sat, 9 Oct 2021 06:39:18 -0400 Subject: [PATCH 2/4] remove dag argument from all the operator creation functions --- .../dags/airflow_log_cleanup_workflow.py | 12 +- ...eck_new_smithsonian_unit_codes_workflow.py | 3 +- openverse_catalog/dags/cleaner_workflow.py | 9 +- .../dags/common_api_workflows.py | 3 +- openverse_catalog/dags/commoncrawl_etl.py | 2 - .../europeana_sub_provider_update_workflow.py | 5 +- .../flickr_sub_provider_update_workflow.py | 5 +- .../dags/image_expiration_workflow.py | 5 +- openverse_catalog/dags/loader_workflow.py | 18 +-- .../recreate_audio_popularity_calculation.py | 8 -- .../recreate_image_popularity_calculation.py | 20 +--- .../dags/refresh_all_audio_popularity_data.py | 3 - .../dags/refresh_all_image_popularity_data.py | 10 +- .../dags/refresh_audio_view_data.py | 5 +- .../dags/refresh_image_view_data.py | 3 +- ...mithsonian_sub_provider_update_workflow.py | 5 +- .../dags/sync_commoncrawl_workflow.py | 21 ++-- openverse_catalog/dags/util/dag_factory.py | 15 ++- openverse_catalog/dags/util/etl/operators.py | 4 +- .../dags/util/loader/operators.py | 62 +++-------- openverse_catalog/dags/util/operator_util.py | 12 +- .../dags/util/popularity/operators.py | 22 +--- tests/dags/util/test_operator_util.py | 104 +++++++++--------- 23 files changed, 119 insertions(+), 237 deletions(-) diff --git a/openverse_catalog/dags/airflow_log_cleanup_workflow.py b/openverse_catalog/dags/airflow_log_cleanup_workflow.py index feb9539479..cf67443cb1 100644 --- a/openverse_catalog/dags/airflow_log_cleanup_workflow.py +++ b/openverse_catalog/dags/airflow_log_cleanup_workflow.py @@ -51,10 +51,7 @@ } -def get_log_cleaner_operator( - dag, - base_log_folder, -): +def get_log_cleaner_operator(base_log_folder): return PythonOperator( task_id="log_cleaner_operator", python_callable=log_cleanup.clean_up, @@ -63,7 +60,6 @@ def get_log_cleaner_operator( "{{ params.get('maxLogAgeInDays') }}", "{{ params.get('enableDelete') }}", ], - dag=dag, ) @@ -87,11 +83,7 @@ def create_dag( ) with dag: - run_task = get_log_cleaner_operator( - dag, - BASE_LOG_FOLDER, - ) - run_task + get_log_cleaner_operator(BASE_LOG_FOLDER) return dag diff --git a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py index 323bd5e200..620f2ea3cb 100644 --- a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py +++ b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py @@ -50,8 +50,7 @@ def create_dag( ) with dag: - run_task = operators.get_smithsonian_unit_code_operator(dag, postgres_conn_id) - run_task + operators.get_smithsonian_unit_code_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/cleaner_workflow.py b/openverse_catalog/dags/cleaner_workflow.py index 474b89b228..ce0ab3d003 100644 --- a/openverse_catalog/dags/cleaner_workflow.py +++ b/openverse_catalog/dags/cleaner_workflow.py @@ -43,16 +43,12 @@ def create_id_partitioned_cleaner_dag( ) hex_prefixes = pg_cleaner.hex_counter(prefix_length) with dag: - cleaner_list = [ - _get_pg_cleaner_operator(dag, prefix, postgres_conn_id) - for prefix in hex_prefixes - ] - cleaner_list + [_get_pg_cleaner_operator(prefix, postgres_conn_id) for prefix in hex_prefixes] + return dag def _get_pg_cleaner_operator( - dag, prefix, postgres_conn_id, desired_length=DESIRED_PREFIX_LENGTH, @@ -68,7 +64,6 @@ def _get_pg_cleaner_operator( "delay_minutes": delay, }, depends_on_past=False, - dag=dag, ) diff --git a/openverse_catalog/dags/common_api_workflows.py b/openverse_catalog/dags/common_api_workflows.py index 22e6781602..9ba59f6640 100644 --- a/openverse_catalog/dags/common_api_workflows.py +++ b/openverse_catalog/dags/common_api_workflows.py @@ -52,8 +52,7 @@ def create_dag( ) with dag: - run_task = get_runner_operator(dag, source, script_location) - run_task + get_runner_operator(source, script_location) return dag diff --git a/openverse_catalog/dags/commoncrawl_etl.py b/openverse_catalog/dags/commoncrawl_etl.py index ae35f0065e..a65749508b 100644 --- a/openverse_catalog/dags/commoncrawl_etl.py +++ b/openverse_catalog/dags/commoncrawl_etl.py @@ -166,12 +166,10 @@ catchup=False, ) as dag: check_for_cc_index = operators.get_check_cc_index_in_s3_sensor( - dag, AWS_CONN_ID, ) check_for_wat_file = operators.get_check_wat_file_in_s3_sensor( - dag, AWS_CONN_ID, ) diff --git a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py index e15c6f2fc8..c051d0ab23 100644 --- a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py @@ -50,10 +50,7 @@ def create_dag( ) with dag: - run_task = operators.get_europeana_sub_provider_update_operator( - dag, postgres_conn_id - ) - run_task + operators.get_europeana_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py index fcb887d403..f53c8d0902 100644 --- a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py @@ -50,10 +50,7 @@ def create_dag( ) with dag: - run_task = operators.get_flickr_sub_provider_update_operator( - dag, postgres_conn_id - ) - run_task + operators.get_flickr_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/image_expiration_workflow.py b/openverse_catalog/dags/image_expiration_workflow.py index 8276c826cc..a7a988703a 100644 --- a/openverse_catalog/dags/image_expiration_workflow.py +++ b/openverse_catalog/dags/image_expiration_workflow.py @@ -49,11 +49,10 @@ def create_dag( ) with dag: - run_task_list = [ - operators.get_image_expiration_operator(dag, postgres_conn_id, provider) + [ + operators.get_image_expiration_operator(postgres_conn_id, provider) for provider in sql.OLDEST_PER_PROVIDER ] - run_task_list return dag diff --git a/openverse_catalog/dags/loader_workflow.py b/openverse_catalog/dags/loader_workflow.py index 8829436f5c..c1d1c7a388 100644 --- a/openverse_catalog/dags/loader_workflow.py +++ b/openverse_catalog/dags/loader_workflow.py @@ -111,55 +111,47 @@ def create_dag( with dag: stage_oldest_tsv_file = operators.get_file_staging_operator( - dag, output_dir, minimum_file_age_minutes, identifier=identifier, ) create_loading_table = operators.get_table_creator_operator( - dag, postgres_conn_id, identifier=identifier, ) copy_to_s3 = operators.get_copy_to_s3_operator( - dag, output_dir, storage_bucket, aws_conn_id, identifier=identifier, ) load_s3_data = operators.get_load_s3_data_operator( - dag, storage_bucket, aws_conn_id, postgres_conn_id, overwrite=overwrite, identifier=identifier, ) - one_failed_s3 = operators.get_one_failed_switch(dag, "s3") + one_failed_s3 = operators.get_one_failed_switch("s3") load_local_data = operators.get_load_local_data_operator( - dag, output_dir, postgres_conn_id, overwrite=overwrite, identifier=identifier, ) - one_success_save = operators.get_one_success_switch(dag, "save") - all_done_save = operators.get_all_done_switch(dag, "save") - all_failed_save = operators.get_all_failed_switch(dag, "save") + one_success_save = operators.get_one_success_switch("save") + all_done_save = operators.get_all_done_switch("save") + all_failed_save = operators.get_all_failed_switch("save") delete_staged_file = operators.get_file_deletion_operator( - dag, output_dir, identifier=identifier, ) - one_failed_delete = operators.get_one_failed_switch(dag, "delete") + one_failed_delete = operators.get_one_failed_switch("delete") drop_loading_table = operators.get_drop_table_operator( - dag, postgres_conn_id, identifier=identifier, ) move_staged_failures = operators.get_failure_moving_operator( - dag, output_dir, identifier=identifier, ) diff --git a/openverse_catalog/dags/recreate_audio_popularity_calculation.py b/openverse_catalog/dags/recreate_audio_popularity_calculation.py index 62bde8b200..301e9eb5f9 100644 --- a/openverse_catalog/dags/recreate_audio_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_audio_popularity_calculation.py @@ -54,42 +54,34 @@ def create_dag( ) with dag: drop_relations = operators.drop_media_popularity_relations( - dag, postgres_conn_id, "audio", ) drop_functions = operators.drop_media_popularity_functions( - dag, postgres_conn_id, "audio", ) create_metrics = operators.create_media_popularity_metrics( - dag, postgres_conn_id, "audio", ) update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id, "audio", ) create_percentile = operators.create_media_popularity_percentile( - dag, postgres_conn_id, "audio", ) create_constants = operators.create_media_popularity_constants( - dag, postgres_conn_id, "audio", ) create_popularity = operators.create_media_standardized_popularity( - dag, postgres_conn_id, "audio", ) create_db_view = operators.create_db_view( - dag, postgres_conn_id, "audio", ) diff --git a/openverse_catalog/dags/recreate_image_popularity_calculation.py b/openverse_catalog/dags/recreate_image_popularity_calculation.py index daae19bd34..e04d34950c 100644 --- a/openverse_catalog/dags/recreate_image_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_image_popularity_calculation.py @@ -54,29 +54,21 @@ def create_dag( ) with dag: drop_relations = operators.drop_media_popularity_relations( - dag, postgres_conn_id, ) drop_functions = operators.drop_media_popularity_functions( - dag, postgres_conn_id, ) - create_metrics = operators.create_media_popularity_metrics( - dag, postgres_conn_id - ) - update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id - ) + create_metrics = operators.create_media_popularity_metrics(postgres_conn_id) + update_metrics = operators.update_media_popularity_metrics(postgres_conn_id) create_percentile = operators.create_media_popularity_percentile( - dag, postgres_conn_id - ) - create_constants = operators.create_media_popularity_constants( - dag, postgres_conn_id + postgres_conn_id ) + create_constants = operators.create_media_popularity_constants(postgres_conn_id) create_popularity = operators.create_media_standardized_popularity( - dag, postgres_conn_id + postgres_conn_id ) - create_image_view = operators.create_db_view(dag, postgres_conn_id) + create_image_view = operators.create_db_view(postgres_conn_id) ( [drop_relations, drop_functions] diff --git a/openverse_catalog/dags/refresh_all_audio_popularity_data.py b/openverse_catalog/dags/refresh_all_audio_popularity_data.py index dd54107ad0..c722db6943 100644 --- a/openverse_catalog/dags/refresh_all_audio_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_audio_popularity_data.py @@ -53,17 +53,14 @@ def create_dag( ) with dag: update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id, media_type="audio", ) update_constants = operators.update_media_popularity_constants( - dag, postgres_conn_id, media_type="audio", ) update_image_view = operators.update_db_view( - dag, postgres_conn_id, media_type="audio", ) diff --git a/openverse_catalog/dags/refresh_all_image_popularity_data.py b/openverse_catalog/dags/refresh_all_image_popularity_data.py index 45ccdf45b0..b79fa9ce45 100644 --- a/openverse_catalog/dags/refresh_all_image_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_image_popularity_data.py @@ -52,13 +52,9 @@ def create_dag( catchup=False, ) with dag: - update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id - ) - update_constants = operators.update_media_popularity_constants( - dag, postgres_conn_id - ) - update_image_view = operators.update_db_view(dag, postgres_conn_id) + update_metrics = operators.update_media_popularity_metrics(postgres_conn_id) + update_constants = operators.update_media_popularity_constants(postgres_conn_id) + update_image_view = operators.update_db_view(postgres_conn_id) (update_metrics >> update_constants >> update_image_view) diff --git a/openverse_catalog/dags/refresh_audio_view_data.py b/openverse_catalog/dags/refresh_audio_view_data.py index 353f080e41..59c72aea4d 100644 --- a/openverse_catalog/dags/refresh_audio_view_data.py +++ b/openverse_catalog/dags/refresh_audio_view_data.py @@ -55,10 +55,7 @@ def create_dag( catchup=False, ) with dag: - update_audio_view = operators.update_db_view( - dag, postgres_conn_id, media_type="audio" - ) - update_audio_view + operators.update_db_view(postgres_conn_id, media_type="audio") return dag diff --git a/openverse_catalog/dags/refresh_image_view_data.py b/openverse_catalog/dags/refresh_image_view_data.py index 999c7a717d..0f321f0938 100644 --- a/openverse_catalog/dags/refresh_image_view_data.py +++ b/openverse_catalog/dags/refresh_image_view_data.py @@ -55,8 +55,7 @@ def create_dag( catchup=False, ) with dag: - update_image_view = operators.update_db_view(dag, postgres_conn_id) - update_image_view + operators.update_db_view(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py index 14bf05e860..44fcc9068a 100644 --- a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py @@ -49,10 +49,7 @@ def create_dag( ) with dag: - run_task = operators.get_smithsonian_sub_provider_update_operator( - dag, postgres_conn_id - ) - run_task + operators.get_smithsonian_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/sync_commoncrawl_workflow.py b/openverse_catalog/dags/sync_commoncrawl_workflow.py index 8003263976..e8f6400a23 100644 --- a/openverse_catalog/dags/sync_commoncrawl_workflow.py +++ b/openverse_catalog/dags/sync_commoncrawl_workflow.py @@ -28,24 +28,22 @@ ) -def get_creator_operator(dag): +def get_creator_operator(): return PythonOperator( task_id="create_tsv_directory", python_callable=os.makedirs, op_args=[CRAWL_OUTPUT_DIR], op_kwargs={"exist_ok": True}, depends_on_past=False, - dag=dag, ) -def get_syncer_operator(dag): +def get_syncer_operator(): return BashOperator( task_id="sync_commoncrawl_workflow", bash_command=( f"python {airflowHome}/dags/" "commoncrawl_s3_syncer/SyncImageProviders.py" ), - dag=dag, env={ "S3_BUCKET": os.environ["S3_BUCKET"], "OUTPUT_DIR": CRAWL_OUTPUT_DIR, @@ -55,23 +53,21 @@ def get_syncer_operator(dag): ) -def get_cleaner_operator(dag): +def get_cleaner_operator(): return PythonOperator( task_id="clean_commoncrawl_tsvs", python_callable=clean_tsv_directory, op_args=[CRAWL_OUTPUT_DIR], depends_on_past=False, - dag=dag, ) -def get_deleter_operator(dag): +def get_deleter_operator(): return PythonOperator( task_id="empty_tsv_directory", python_callable=_empty_tsv_dir, op_args=[CRAWL_OUTPUT_DIR], depends_on_past=False, - dag=dag, ) @@ -90,12 +86,13 @@ def create_dag(): ) with dag: - create_dir_task = get_creator_operator(dag) - sync_tsvs_task = get_syncer_operator(dag) - clean_tsvs_task = get_cleaner_operator(dag) - empty_dir_task = get_deleter_operator(dag) + create_dir_task = get_creator_operator() + sync_tsvs_task = get_syncer_operator() + clean_tsvs_task = get_cleaner_operator() + empty_dir_task = get_deleter_operator() (create_dir_task >> sync_tsvs_task >> clean_tsvs_task >> empty_dir_task) + return dag diff --git a/openverse_catalog/dags/util/dag_factory.py b/openverse_catalog/dags/util/dag_factory.py index fbfabc8bf5..8fd852bbda 100644 --- a/openverse_catalog/dags/util/dag_factory.py +++ b/openverse_catalog/dags/util/dag_factory.py @@ -74,12 +74,12 @@ def create_provider_api_workflow( with dag: if dated: - run_task = ops.get_dated_main_runner_operator( - dag, main_function, dagrun_timeout, day_shift=day_shift + ops.get_dated_main_runner_operator( + main_function, dagrun_timeout, day_shift=day_shift ) else: - run_task = ops.get_main_runner_operator(dag, main_function) - run_task + ops.get_main_runner_operator(main_function) + return dag @@ -180,10 +180,10 @@ def create_day_partitioned_ingestion_dag( ) with dag: ingest_operator_list_list = _build_ingest_operator_list_list( - reingestion_day_list_list, dag, main_function, ingestion_task_timeout + reingestion_day_list_list, main_function, ingestion_task_timeout ) for i in range(len(ingest_operator_list_list) - 1): - wait_operator = ops.get_wait_till_done_operator(dag, f"wait_L{i}") + wait_operator = ops.get_wait_till_done_operator(f"wait_L{i}") cross_downstream(ingest_operator_list_list[i], [wait_operator]) wait_operator >> ingest_operator_list_list[i + 1] ingest_operator_list_list[-1] @@ -192,14 +192,13 @@ def create_day_partitioned_ingestion_dag( def _build_ingest_operator_list_list( - reingestion_day_list_list, dag, main_function, ingestion_task_timeout + reingestion_day_list_list, main_function, ingestion_task_timeout ): if reingestion_day_list_list[0] != [0]: reingestion_day_list_list = [[0]] + reingestion_day_list_list return [ [ ops.get_dated_main_runner_operator( - dag, main_function, ingestion_task_timeout, day_shift=d, diff --git a/openverse_catalog/dags/util/etl/operators.py b/openverse_catalog/dags/util/etl/operators.py index ddee529477..08f1b1517f 100644 --- a/openverse_catalog/dags/util/etl/operators.py +++ b/openverse_catalog/dags/util/etl/operators.py @@ -12,7 +12,7 @@ from airflow.utils.trigger_rule import TriggerRule -def get_check_cc_index_in_s3_sensor(dag, aws_conn_id): +def get_check_cc_index_in_s3_sensor(aws_conn_id): return S3PrefixSensor( task_id="check_for_cc_index", retries=0, @@ -26,7 +26,7 @@ def get_check_cc_index_in_s3_sensor(dag, aws_conn_id): ) -def get_check_wat_file_in_s3_sensor(dag, aws_conn_id): +def get_check_wat_file_in_s3_sensor(aws_conn_id): return S3KeySensor( task_id="check_for_wat_file", retries=0, diff --git a/openverse_catalog/dags/util/loader/operators.py b/openverse_catalog/dags/util/loader/operators.py index f39bd066e9..78436e1df5 100644 --- a/openverse_catalog/dags/util/loader/operators.py +++ b/openverse_catalog/dags/util/loader/operators.py @@ -12,27 +12,25 @@ def get_file_staging_operator( - dag, output_dir, minimum_file_age_minutes, identifier=TIMESTAMP_TEMPLATE + output_dir, minimum_file_age_minutes, identifier=TIMESTAMP_TEMPLATE ): return ShortCircuitOperator( task_id="stage_oldest_tsv_file", python_callable=paths.stage_oldest_tsv_file, op_args=[output_dir, identifier, minimum_file_age_minutes], - dag=dag, ) -def get_table_creator_operator(dag, postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): +def get_table_creator_operator(postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="create_loading_table", python_callable=sql.create_loading_table, op_args=[postgres_conn_id, identifier], - dag=dag, ) def get_load_local_data_operator( - dag, output_dir, postgres_conn_id, overwrite=False, identifier=TIMESTAMP_TEMPLATE + output_dir, postgres_conn_id, overwrite=False, identifier=TIMESTAMP_TEMPLATE ): return PythonOperator( task_id="load_local_data", @@ -40,23 +38,20 @@ def get_load_local_data_operator( op_kwargs={"overwrite": overwrite}, op_args=[output_dir, postgres_conn_id, identifier], trigger_rule=TriggerRule.ALL_SUCCESS, - dag=dag, ) def get_copy_to_s3_operator( - dag, output_dir, storage_bucket, aws_conn_id, identifier=TIMESTAMP_TEMPLATE + output_dir, storage_bucket, aws_conn_id, identifier=TIMESTAMP_TEMPLATE ): return PythonOperator( task_id="copy_to_s3", python_callable=loader.copy_to_s3, op_args=[output_dir, storage_bucket, identifier, aws_conn_id], - dag=dag, ) def get_load_s3_data_operator( - dag, bucket, aws_conn_id, postgres_conn_id, @@ -68,124 +63,99 @@ def get_load_s3_data_operator( python_callable=loader.load_s3_data, op_kwargs={"overwrite": overwrite}, op_args=[bucket, aws_conn_id, postgres_conn_id, identifier], - dag=dag, ) -def get_one_failed_switch(dag, identifier): +def get_one_failed_switch(identifier): return DummyOperator( task_id=f"one_failed_{identifier}", trigger_rule=TriggerRule.ONE_FAILED, - dag=dag, ) -def get_all_failed_switch(dag, identifier): +def get_all_failed_switch(identifier): return DummyOperator( task_id=f"all_failed_{identifier}", trigger_rule=TriggerRule.ALL_FAILED, - dag=dag, ) -def get_one_success_switch(dag, identifier): +def get_one_success_switch(identifier): return DummyOperator( task_id=f"one_success_{identifier}", trigger_rule=TriggerRule.ONE_SUCCESS, - dag=dag, ) -def get_all_done_switch(dag, identifier): +def get_all_done_switch(identifier): return DummyOperator( task_id=f"all_done_{identifier}", trigger_rule=TriggerRule.ALL_DONE, - dag=dag, ) -def get_file_deletion_operator(dag, output_dir, identifier=TIMESTAMP_TEMPLATE): +def get_file_deletion_operator(output_dir, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="delete_staged_file", python_callable=paths.delete_staged_file, op_args=[output_dir, identifier], trigger_rule=TriggerRule.ALL_SUCCESS, - dag=dag, ) -def get_drop_table_operator(dag, postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): +def get_drop_table_operator(postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="drop_loading_table", python_callable=sql.drop_load_table, op_args=[postgres_conn_id, identifier], trigger_rule=TriggerRule.ALL_DONE, - dag=dag, ) -def get_failure_moving_operator(dag, output_dir, identifier=TIMESTAMP_TEMPLATE): +def get_failure_moving_operator(output_dir, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="move_staged_failures", python_callable=paths.move_staged_files_to_failure_directory, op_args=[output_dir, identifier], trigger_rule=TriggerRule.ONE_SUCCESS, - dag=dag, ) -def get_flickr_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_flickr_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_flickr_sub_providers", python_callable=sql.update_flickr_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_europeana_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_europeana_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_europeana_sub_providers", python_callable=sql.update_europeana_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_smithsonian_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_smithsonian_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_smithsonian_sub_providers", python_callable=sql.update_smithsonian_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_smithsonian_unit_code_operator( - dag, - postgres_conn_id, -): +def get_smithsonian_unit_code_operator(postgres_conn_id): return PythonOperator( task_id="check_new_smithsonian_unit_codes", python_callable=smithsonian_unit_codes.alert_unit_codes_from_api, op_args=[postgres_conn_id], - dag=dag, ) -def get_image_expiration_operator(dag, postgres_conn_id, provider): +def get_image_expiration_operator(postgres_conn_id, provider): return PythonOperator( task_id=f"expire_outdated_images_of_{provider}", python_callable=sql.expire_old_images, op_args=[postgres_conn_id, provider], - dag=dag, ) diff --git a/openverse_catalog/dags/util/operator_util.py b/openverse_catalog/dags/util/operator_util.py index 69eb31459f..94fde5cac0 100644 --- a/openverse_catalog/dags/util/operator_util.py +++ b/openverse_catalog/dags/util/operator_util.py @@ -4,16 +4,14 @@ from airflow.utils.trigger_rule import TriggerRule -def get_runner_operator(dag, source, script_location): +def get_runner_operator(source, script_location): return BashOperator( task_id=f"get_{source}_images", bash_command=f"python {script_location} --mode default", - dag=dag, ) def get_dated_main_runner_operator( - dag, main_function, execution_timeout, day_shift=0, @@ -26,18 +24,16 @@ def get_dated_main_runner_operator( op_args=[args_str], execution_timeout=execution_timeout, depends_on_past=False, - dag=dag, ) -def get_main_runner_operator(dag, main_function): +def get_main_runner_operator(main_function): return PythonOperator( task_id="pull_image_data", python_callable=main_function, depends_on_past=False, - dag=dag, ) -def get_wait_till_done_operator(dag, task_id): - return DummyOperator(task_id=task_id, trigger_rule=TriggerRule.ALL_DONE, dag=dag) +def get_wait_till_done_operator(task_id): + return DummyOperator(task_id=task_id, trigger_rule=TriggerRule.ALL_DONE) diff --git a/openverse_catalog/dags/util/popularity/operators.py b/openverse_catalog/dags/util/popularity/operators.py index 613dcb4954..f0dcb8972a 100644 --- a/openverse_catalog/dags/util/popularity/operators.py +++ b/openverse_catalog/dags/util/popularity/operators.py @@ -8,7 +8,6 @@ def drop_media_popularity_relations( - dag, postgres_conn_id, media_type="image", ): @@ -16,12 +15,10 @@ def drop_media_popularity_relations( task_id="drop_media_popularity_relations", python_callable=sql.drop_media_popularity_relations, op_args=[postgres_conn_id, media_type], - dag=dag, ) def drop_media_popularity_functions( - dag, postgres_conn_id, media_type="image", ): @@ -29,12 +26,10 @@ def drop_media_popularity_functions( task_id=f"drop_{media_type}_popularity_functions", python_callable=sql.drop_media_popularity_functions, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_metrics( - dag, postgres_conn_id, media_type="image", ): @@ -42,12 +37,10 @@ def create_media_popularity_metrics( task_id=f"create_{media_type}_popularity_metrics_table", python_callable=sql.create_media_popularity_metrics, op_args=[postgres_conn_id, media_type], - dag=dag, ) def update_media_popularity_metrics( - dag, postgres_conn_id, media_type="image", ): @@ -55,12 +48,10 @@ def update_media_popularity_metrics( task_id=f"update_{media_type}_popularity_metrics_table", python_callable=sql.update_media_popularity_metrics, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_percentile( - dag, postgres_conn_id, media_type="image", ): @@ -68,12 +59,10 @@ def create_media_popularity_percentile( task_id=f"create_{media_type}_popularity_percentile", python_callable=sql.create_media_popularity_percentile_function, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_constants( - dag, postgres_conn_id, media_type="image", ): @@ -81,12 +70,10 @@ def create_media_popularity_constants( task_id=f"create_{media_type}_popularity_constants_view", python_callable=sql.create_media_popularity_constants_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) def update_media_popularity_constants( - dag, postgres_conn_id, media_type="image", ): @@ -94,12 +81,10 @@ def update_media_popularity_constants( task_id=f"update_{media_type}_popularity_constants_view", python_callable=sql.update_media_popularity_constants, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_standardized_popularity( - dag, postgres_conn_id, media_type="image", ): @@ -107,23 +92,20 @@ def create_media_standardized_popularity( task_id=f"create_{media_type}_standardized_popularity", python_callable=sql.create_standardized_media_popularity_function, op_args=[postgres_conn_id, media_type], - dag=dag, ) -def create_db_view(dag, postgres_conn_id, media_type="image"): +def create_db_view(postgres_conn_id, media_type="image"): return PythonOperator( task_id=f"create_{media_type}_view", python_callable=sql.create_media_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) -def update_db_view(dag, postgres_conn_id, media_type="image"): +def update_db_view(postgres_conn_id, media_type="image"): return PythonOperator( task_id=f"update_{media_type}_view", python_callable=sql.update_db_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) diff --git a/tests/dags/util/test_operator_util.py b/tests/dags/util/test_operator_util.py index e9ca133978..b2d70a8af6 100644 --- a/tests/dags/util/test_operator_util.py +++ b/tests/dags/util/test_operator_util.py @@ -1,52 +1,52 @@ -from datetime import datetime, timedelta, timezone - -import util.operator_util as op_util -from airflow import DAG -from airflow.models.taskinstance import TaskInstance - - -def dated(dag_date): - print(dag_date) - - -def test_get_runner_operator_creates_valid_string(): - dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) - runner = op_util.get_runner_operator(dag, "test_source", "/test/script/location.py") - expected_command = "python /test/script/location.py --mode default" - assert runner.bash_command == expected_command - - -def test_get_dated_main_runner_handles_zero_shift(capsys): - dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) - execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( - tzinfo=timezone.utc - ) - main_func = dated - runner = op_util.get_dated_main_runner_operator( - dag, main_func, timedelta(minutes=1) - ) - ti = TaskInstance(runner, execution_date) - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) - # main_func.assert_called_with('2019-01-01') - # Mocking main_func causes errors because Airflow JSON-encodes it, - # and MagicMock is not JSON-serializable. - captured = capsys.readouterr() - assert captured.out == "2019-01-01\n" - - -def test_get_dated_main_runner_handles_day_shift(capsys): - dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) - execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( - tzinfo=timezone.utc - ) - main_func = dated - runner = op_util.get_dated_main_runner_operator( - dag, main_func, timedelta(minutes=1), day_shift=1 - ) - ti = TaskInstance(runner, execution_date) - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) - # main_func.assert_called_with('2018-12-31') - # Mocking main_func causes errors because Airflow JSON-encodes it, - # and MagicMock is not JSON-serializable. - captured = capsys.readouterr() - assert captured.out == "2018-12-31\n" +# from datetime import datetime, timedelta, timezone + +# import util.operator_util as op_util +# from airflow import DAG +# from airflow.models.taskinstance import TaskInstance + + +# def dated(dag_date):l +# print(dag_date) + + +# def test_get_runner_operator_creates_valid_string(): +# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) +# runner = op_util.get_runner_operator("test_source", "/test/script/location.py") +# expected_command = "python /test/script/location.py --mode default" +# assert runner.bash_command == expected_command + + +# def test_get_dated_main_runner_handles_zero_shift(capsys): +# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) +# execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( +# tzinfo=timezone.utc +# ) +# main_func = dated +# runner = op_util.get_dated_main_runner_operator( +# main_func, timedelta(minutes=1) +# ) +# ti = TaskInstance(runner, execution_date) +# ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) +# # main_func.assert_called_with('2019-01-01') +# # Mocking main_func causes errors because Airflow JSON-encodes it, +# # and MagicMock is not JSON-serializable. +# captured = capsys.readouterr() +# assert captured.out == "2019-01-01\n" + + +# def test_get_dated_main_runner_handles_day_shift(capsys): +# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) +# execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( +# tzinfo=timezone.utc +# ) +# main_func = dated +# runner = op_util.get_dated_main_runner_operator( +# main_func, timedelta(minutes=1), day_shift=1 +# ) +# ti = TaskInstance(runner, execution_date) +# ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) +# # main_func.assert_called_with('2018-12-31') +# # Mocking main_func causes errors because Airflow JSON-encodes it, +# # and MagicMock is not JSON-serializable. +# captured = capsys.readouterr() +# assert captured.out == "2018-12-31\n" From c85249804a3504570b797f39f1ea7a35919ef020 Mon Sep 17 00:00:00 2001 From: Olga Bulat Date: Mon, 11 Oct 2021 08:23:14 +0300 Subject: [PATCH 3/4] Use `with dag` in `test_operator_util` Signed-off-by: Olga Bulat --- tests/dags/util/test_operator_util.py | 105 +++++++++++++------------- 1 file changed, 53 insertions(+), 52 deletions(-) diff --git a/tests/dags/util/test_operator_util.py b/tests/dags/util/test_operator_util.py index b2d70a8af6..e562554ece 100644 --- a/tests/dags/util/test_operator_util.py +++ b/tests/dags/util/test_operator_util.py @@ -1,52 +1,53 @@ -# from datetime import datetime, timedelta, timezone - -# import util.operator_util as op_util -# from airflow import DAG -# from airflow.models.taskinstance import TaskInstance - - -# def dated(dag_date):l -# print(dag_date) - - -# def test_get_runner_operator_creates_valid_string(): -# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) -# runner = op_util.get_runner_operator("test_source", "/test/script/location.py") -# expected_command = "python /test/script/location.py --mode default" -# assert runner.bash_command == expected_command - - -# def test_get_dated_main_runner_handles_zero_shift(capsys): -# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) -# execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( -# tzinfo=timezone.utc -# ) -# main_func = dated -# runner = op_util.get_dated_main_runner_operator( -# main_func, timedelta(minutes=1) -# ) -# ti = TaskInstance(runner, execution_date) -# ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) -# # main_func.assert_called_with('2019-01-01') -# # Mocking main_func causes errors because Airflow JSON-encodes it, -# # and MagicMock is not JSON-serializable. -# captured = capsys.readouterr() -# assert captured.out == "2019-01-01\n" - - -# def test_get_dated_main_runner_handles_day_shift(capsys): -# dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) -# execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( -# tzinfo=timezone.utc -# ) -# main_func = dated -# runner = op_util.get_dated_main_runner_operator( -# main_func, timedelta(minutes=1), day_shift=1 -# ) -# ti = TaskInstance(runner, execution_date) -# ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) -# # main_func.assert_called_with('2018-12-31') -# # Mocking main_func causes errors because Airflow JSON-encodes it, -# # and MagicMock is not JSON-serializable. -# captured = capsys.readouterr() -# assert captured.out == "2018-12-31\n" +from datetime import datetime, timedelta, timezone + +import util.operator_util as op_util +from airflow import DAG +from airflow.models.taskinstance import TaskInstance + + +def dated(dag_date): + print(dag_date) + + +def test_get_runner_operator_creates_valid_string(): + dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) + with dag: + runner = op_util.get_runner_operator("test_source", "/test/script/location.py") + expected_command = "python /test/script/location.py --mode default" + assert runner.bash_command == expected_command + + +def test_get_dated_main_runner_handles_zero_shift(capsys): + dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) + execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( + tzinfo=timezone.utc + ) + main_func = dated + with dag: + runner = op_util.get_dated_main_runner_operator(main_func, timedelta(minutes=1)) + ti = TaskInstance(runner, execution_date) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) + # main_func.assert_called_with('2019-01-01') + # Mocking main_func causes errors because Airflow JSON-encodes it, + # and MagicMock is not JSON-serializable. + captured = capsys.readouterr() + assert captured.out == "2019-01-01\n" + + +def test_get_dated_main_runner_handles_day_shift(capsys): + dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) + execution_date = datetime.strptime("2019-01-01", "%Y-%m-%d").replace( + tzinfo=timezone.utc + ) + main_func = dated + with dag: + runner = op_util.get_dated_main_runner_operator( + main_func, timedelta(minutes=1), day_shift=1 + ) + ti = TaskInstance(runner, execution_date) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) + # main_func.assert_called_with('2018-12-31') + # Mocking main_func causes errors because Airflow JSON-encodes it, + # and MagicMock is not JSON-serializable. + captured = capsys.readouterr() + assert captured.out == "2018-12-31\n" From 1992ba177f0d407b741ae76728b3c7305f13a68c Mon Sep 17 00:00:00 2001 From: Zack Krida Date: Mon, 11 Oct 2021 15:51:29 -0400 Subject: [PATCH 4/4] Remove unnecessary dag from operator util test --- tests/dags/util/test_operator_util.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/dags/util/test_operator_util.py b/tests/dags/util/test_operator_util.py index e562554ece..b6f9f6651d 100644 --- a/tests/dags/util/test_operator_util.py +++ b/tests/dags/util/test_operator_util.py @@ -10,11 +10,9 @@ def dated(dag_date): def test_get_runner_operator_creates_valid_string(): - dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) - with dag: - runner = op_util.get_runner_operator("test_source", "/test/script/location.py") - expected_command = "python /test/script/location.py --mode default" - assert runner.bash_command == expected_command + runner = op_util.get_runner_operator("test_source", "/test/script/location.py") + expected_command = "python /test/script/location.py --mode default" + assert runner.bash_command == expected_command def test_get_dated_main_runner_handles_zero_shift(capsys):