diff --git a/openverse_catalog/dags/airflow_log_cleanup_workflow.py b/openverse_catalog/dags/airflow_log_cleanup_workflow.py index 0dccdd8094..cf67443cb1 100644 --- a/openverse_catalog/dags/airflow_log_cleanup_workflow.py +++ b/openverse_catalog/dags/airflow_log_cleanup_workflow.py @@ -19,7 +19,6 @@ from datetime import datetime, timedelta import jinja2 -import util.operator_util as ops from airflow.configuration import conf from airflow.models import DAG from airflow.operators.python import PythonOperator @@ -52,10 +51,7 @@ } -def get_log_cleaner_operator( - dag, - base_log_folder, -): +def get_log_cleaner_operator(base_log_folder): return PythonOperator( task_id="log_cleaner_operator", python_callable=log_cleanup.clean_up, @@ -64,7 +60,6 @@ def get_log_cleaner_operator( "{{ params.get('maxLogAgeInDays') }}", "{{ params.get('enableDelete') }}", ], - dag=dag, ) @@ -88,14 +83,7 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task = get_log_cleaner_operator( - dag, - BASE_LOG_FOLDER, - ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + get_log_cleaner_operator(BASE_LOG_FOLDER) return dag diff --git a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py index 0443408829..620f2ea3cb 100644 --- a/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py +++ b/openverse_catalog/dags/check_new_smithsonian_unit_codes_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,11 +50,7 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task = operators.get_smithsonian_unit_code_operator(dag, postgres_conn_id) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + operators.get_smithsonian_unit_code_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/cleaner_workflow.py b/openverse_catalog/dags/cleaner_workflow.py index 55267f6979..ce0ab3d003 100644 --- a/openverse_catalog/dags/cleaner_workflow.py +++ b/openverse_catalog/dags/cleaner_workflow.py @@ -5,7 +5,7 @@ from airflow import DAG from airflow.operators.python import PythonOperator -from util import config, operator_util, pg_cleaner +from util import config, pg_cleaner logging.basicConfig( @@ -43,18 +43,12 @@ def create_id_partitioned_cleaner_dag( ) hex_prefixes = pg_cleaner.hex_counter(prefix_length) with dag: - cleaner_list = [ - _get_pg_cleaner_operator(dag, prefix, postgres_conn_id) - for prefix in hex_prefixes - ] - start_task = operator_util.get_log_operator(dag, dag.dag_id, "Started") - end_task = operator_util.get_log_operator(dag, dag.dag_id, "Ended") - start_task >> cleaner_list >> end_task + [_get_pg_cleaner_operator(prefix, postgres_conn_id) for prefix in hex_prefixes] + return dag def _get_pg_cleaner_operator( - dag, prefix, postgres_conn_id, desired_length=DESIRED_PREFIX_LENGTH, @@ -70,7 +64,6 @@ def _get_pg_cleaner_operator( "delay_minutes": delay, }, depends_on_past=False, - dag=dag, ) diff --git a/openverse_catalog/dags/common_api_workflows.py b/openverse_catalog/dags/common_api_workflows.py index b6c49b445a..9ba59f6640 100644 --- a/openverse_catalog/dags/common_api_workflows.py +++ b/openverse_catalog/dags/common_api_workflows.py @@ -4,7 +4,7 @@ import util.config as conf from airflow import DAG from croniter import croniter -from util.operator_util import get_log_operator, get_runner_operator +from util.operator_util import get_runner_operator logging.basicConfig( @@ -52,11 +52,7 @@ def create_dag( ) with dag: - start_task = get_log_operator(dag, source, "starting") - run_task = get_runner_operator(dag, source, script_location) - end_task = get_log_operator(dag, source, "finished") - - start_task >> run_task >> end_task + get_runner_operator(source, script_location) return dag diff --git a/openverse_catalog/dags/commoncrawl_etl.py b/openverse_catalog/dags/commoncrawl_etl.py index 0631154a75..a65749508b 100644 --- a/openverse_catalog/dags/commoncrawl_etl.py +++ b/openverse_catalog/dags/commoncrawl_etl.py @@ -165,16 +165,11 @@ concurrency=1, catchup=False, ) as dag: - - job_start_logger = operators.get_log_operator(dag, "Starting") - check_for_cc_index = operators.get_check_cc_index_in_s3_sensor( - dag, AWS_CONN_ID, ) check_for_wat_file = operators.get_check_wat_file_in_s3_sensor( - dag, AWS_CONN_ID, ) @@ -207,15 +202,12 @@ AWS_CONN_ID, ) - job_done_logger = operators.get_log_operator(dag, "Finished") - ( - job_start_logger - >> check_for_cc_index + check_for_cc_index >> check_for_wat_file >> [extract_script_loader, cluster_bootstrap_loader] >> job_flow_creator >> job_sensor >> job_flow_terminator ) - [job_flow_creator, job_sensor, job_flow_terminator] >> job_done_logger + [job_flow_creator, job_sensor, job_flow_terminator] diff --git a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py index d7b7516d4c..c051d0ab23 100644 --- a/openverse_catalog/dags/europeana_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/europeana_sub_provider_update_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,13 +50,7 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task = operators.get_europeana_sub_provider_update_operator( - dag, postgres_conn_id - ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + operators.get_europeana_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py index 86d592b9ac..f53c8d0902 100644 --- a/openverse_catalog/dags/flickr_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/flickr_sub_provider_update_workflow.py @@ -8,7 +8,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -51,13 +50,7 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task = operators.get_flickr_sub_provider_update_operator( - dag, postgres_conn_id - ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + operators.get_flickr_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/image_expiration_workflow.py b/openverse_catalog/dags/image_expiration_workflow.py index 88c3765956..a7a988703a 100644 --- a/openverse_catalog/dags/image_expiration_workflow.py +++ b/openverse_catalog/dags/image_expiration_workflow.py @@ -7,7 +7,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators, sql @@ -50,14 +49,10 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task_list = [ - operators.get_image_expiration_operator(dag, postgres_conn_id, provider) + [ + operators.get_image_expiration_operator(postgres_conn_id, provider) for provider in sql.OLDEST_PER_PROVIDER ] - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task_list >> end_task return dag diff --git a/openverse_catalog/dags/loader_workflow.py b/openverse_catalog/dags/loader_workflow.py index 8829436f5c..c1d1c7a388 100644 --- a/openverse_catalog/dags/loader_workflow.py +++ b/openverse_catalog/dags/loader_workflow.py @@ -111,55 +111,47 @@ def create_dag( with dag: stage_oldest_tsv_file = operators.get_file_staging_operator( - dag, output_dir, minimum_file_age_minutes, identifier=identifier, ) create_loading_table = operators.get_table_creator_operator( - dag, postgres_conn_id, identifier=identifier, ) copy_to_s3 = operators.get_copy_to_s3_operator( - dag, output_dir, storage_bucket, aws_conn_id, identifier=identifier, ) load_s3_data = operators.get_load_s3_data_operator( - dag, storage_bucket, aws_conn_id, postgres_conn_id, overwrite=overwrite, identifier=identifier, ) - one_failed_s3 = operators.get_one_failed_switch(dag, "s3") + one_failed_s3 = operators.get_one_failed_switch("s3") load_local_data = operators.get_load_local_data_operator( - dag, output_dir, postgres_conn_id, overwrite=overwrite, identifier=identifier, ) - one_success_save = operators.get_one_success_switch(dag, "save") - all_done_save = operators.get_all_done_switch(dag, "save") - all_failed_save = operators.get_all_failed_switch(dag, "save") + one_success_save = operators.get_one_success_switch("save") + all_done_save = operators.get_all_done_switch("save") + all_failed_save = operators.get_all_failed_switch("save") delete_staged_file = operators.get_file_deletion_operator( - dag, output_dir, identifier=identifier, ) - one_failed_delete = operators.get_one_failed_switch(dag, "delete") + one_failed_delete = operators.get_one_failed_switch("delete") drop_loading_table = operators.get_drop_table_operator( - dag, postgres_conn_id, identifier=identifier, ) move_staged_failures = operators.get_failure_moving_operator( - dag, output_dir, identifier=identifier, ) diff --git a/openverse_catalog/dags/recreate_audio_popularity_calculation.py b/openverse_catalog/dags/recreate_audio_popularity_calculation.py index 66e1813ef7..301e9eb5f9 100644 --- a/openverse_catalog/dags/recreate_audio_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_audio_popularity_calculation.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -54,58 +53,46 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") drop_relations = operators.drop_media_popularity_relations( - dag, postgres_conn_id, "audio", ) drop_functions = operators.drop_media_popularity_functions( - dag, postgres_conn_id, "audio", ) create_metrics = operators.create_media_popularity_metrics( - dag, postgres_conn_id, "audio", ) update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id, "audio", ) create_percentile = operators.create_media_popularity_percentile( - dag, postgres_conn_id, "audio", ) create_constants = operators.create_media_popularity_constants( - dag, postgres_conn_id, "audio", ) create_popularity = operators.create_media_standardized_popularity( - dag, postgres_conn_id, "audio", ) create_db_view = operators.create_db_view( - dag, postgres_conn_id, "audio", ) - end_task = get_log_operator(dag, DAG_ID, "Finished") ( - start_task - >> [drop_relations, drop_functions] + [drop_relations, drop_functions] >> create_metrics >> [update_metrics, create_percentile] >> create_constants >> create_popularity >> create_db_view - >> end_task ) return dag diff --git a/openverse_catalog/dags/recreate_image_popularity_calculation.py b/openverse_catalog/dags/recreate_image_popularity_calculation.py index 48e1db0b17..e04d34950c 100644 --- a/openverse_catalog/dags/recreate_image_popularity_calculation.py +++ b/openverse_catalog/dags/recreate_image_popularity_calculation.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -54,42 +53,30 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") drop_relations = operators.drop_media_popularity_relations( - dag, postgres_conn_id, ) drop_functions = operators.drop_media_popularity_functions( - dag, postgres_conn_id, ) - create_metrics = operators.create_media_popularity_metrics( - dag, postgres_conn_id - ) - update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id - ) + create_metrics = operators.create_media_popularity_metrics(postgres_conn_id) + update_metrics = operators.update_media_popularity_metrics(postgres_conn_id) create_percentile = operators.create_media_popularity_percentile( - dag, postgres_conn_id - ) - create_constants = operators.create_media_popularity_constants( - dag, postgres_conn_id + postgres_conn_id ) + create_constants = operators.create_media_popularity_constants(postgres_conn_id) create_popularity = operators.create_media_standardized_popularity( - dag, postgres_conn_id + postgres_conn_id ) - create_image_view = operators.create_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") + create_image_view = operators.create_db_view(postgres_conn_id) ( - start_task - >> [drop_relations, drop_functions] + [drop_relations, drop_functions] >> create_metrics >> [update_metrics, create_percentile] >> create_constants >> create_popularity >> create_image_view - >> end_task ) return dag diff --git a/openverse_catalog/dags/refresh_all_audio_popularity_data.py b/openverse_catalog/dags/refresh_all_audio_popularity_data.py index c1477f874b..c722db6943 100644 --- a/openverse_catalog/dags/refresh_all_audio_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_audio_popularity_data.py @@ -11,7 +11,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -53,31 +52,20 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id, media_type="audio", ) update_constants = operators.update_media_popularity_constants( - dag, postgres_conn_id, media_type="audio", ) update_image_view = operators.update_db_view( - dag, postgres_conn_id, media_type="audio", ) - end_task = get_log_operator(dag, DAG_ID, "Finished") - ( - start_task - >> update_metrics - >> update_constants - >> update_image_view - >> end_task - ) + (update_metrics >> update_constants >> update_image_view) return dag diff --git a/openverse_catalog/dags/refresh_all_image_popularity_data.py b/openverse_catalog/dags/refresh_all_image_popularity_data.py index 22cce4c7a9..b79fa9ce45 100644 --- a/openverse_catalog/dags/refresh_all_image_popularity_data.py +++ b/openverse_catalog/dags/refresh_all_image_popularity_data.py @@ -11,7 +11,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -53,23 +52,11 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") - update_metrics = operators.update_media_popularity_metrics( - dag, postgres_conn_id - ) - update_constants = operators.update_media_popularity_constants( - dag, postgres_conn_id - ) - update_image_view = operators.update_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") + update_metrics = operators.update_media_popularity_metrics(postgres_conn_id) + update_constants = operators.update_media_popularity_constants(postgres_conn_id) + update_image_view = operators.update_db_view(postgres_conn_id) - ( - start_task - >> update_metrics - >> update_constants - >> update_image_view - >> end_task - ) + (update_metrics >> update_constants >> update_image_view) return dag diff --git a/openverse_catalog/dags/refresh_audio_view_data.py b/openverse_catalog/dags/refresh_audio_view_data.py index 871552266a..59c72aea4d 100644 --- a/openverse_catalog/dags/refresh_audio_view_data.py +++ b/openverse_catalog/dags/refresh_audio_view_data.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -56,13 +55,7 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") - update_audio_view = operators.update_db_view( - dag, postgres_conn_id, media_type="audio" - ) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - start_task >> update_audio_view >> end_task + operators.update_db_view(postgres_conn_id, media_type="audio") return dag diff --git a/openverse_catalog/dags/refresh_image_view_data.py b/openverse_catalog/dags/refresh_image_view_data.py index af0285e236..0f321f0938 100644 --- a/openverse_catalog/dags/refresh_image_view_data.py +++ b/openverse_catalog/dags/refresh_image_view_data.py @@ -12,7 +12,6 @@ from datetime import datetime, timedelta from airflow import DAG -from util.operator_util import get_log_operator from util.popularity import operators @@ -56,11 +55,7 @@ def create_dag( catchup=False, ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") - update_image_view = operators.update_db_view(dag, postgres_conn_id) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - start_task >> update_image_view >> end_task + operators.update_db_view(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py index edc713b0dd..44fcc9068a 100644 --- a/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py +++ b/openverse_catalog/dags/smithsonian_sub_provider_update_workflow.py @@ -7,7 +7,6 @@ import os from datetime import datetime, timedelta -import util.operator_util as ops from airflow import DAG from util.loader import operators @@ -50,13 +49,7 @@ def create_dag( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") - run_task = operators.get_smithsonian_sub_provider_update_operator( - dag, postgres_conn_id - ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + operators.get_smithsonian_sub_provider_update_operator(postgres_conn_id) return dag diff --git a/openverse_catalog/dags/sync_commoncrawl_workflow.py b/openverse_catalog/dags/sync_commoncrawl_workflow.py index 2049f630c6..e8f6400a23 100644 --- a/openverse_catalog/dags/sync_commoncrawl_workflow.py +++ b/openverse_catalog/dags/sync_commoncrawl_workflow.py @@ -4,7 +4,6 @@ from airflow import DAG from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator -from util.operator_util import get_log_operator from util.tsv_cleaner import clean_tsv_directory @@ -29,24 +28,22 @@ ) -def get_creator_operator(dag): +def get_creator_operator(): return PythonOperator( task_id="create_tsv_directory", python_callable=os.makedirs, op_args=[CRAWL_OUTPUT_DIR], op_kwargs={"exist_ok": True}, depends_on_past=False, - dag=dag, ) -def get_syncer_operator(dag): +def get_syncer_operator(): return BashOperator( task_id="sync_commoncrawl_workflow", bash_command=( f"python {airflowHome}/dags/" "commoncrawl_s3_syncer/SyncImageProviders.py" ), - dag=dag, env={ "S3_BUCKET": os.environ["S3_BUCKET"], "OUTPUT_DIR": CRAWL_OUTPUT_DIR, @@ -56,23 +53,21 @@ def get_syncer_operator(dag): ) -def get_cleaner_operator(dag): +def get_cleaner_operator(): return PythonOperator( task_id="clean_commoncrawl_tsvs", python_callable=clean_tsv_directory, op_args=[CRAWL_OUTPUT_DIR], depends_on_past=False, - dag=dag, ) -def get_deleter_operator(dag): +def get_deleter_operator(): return PythonOperator( task_id="empty_tsv_directory", python_callable=_empty_tsv_dir, op_args=[CRAWL_OUTPUT_DIR], depends_on_past=False, - dag=dag, ) @@ -91,21 +86,13 @@ def create_dag(): ) with dag: - start_task = get_log_operator(dag, DAG_ID, "Starting") - create_dir_task = get_creator_operator(dag) - sync_tsvs_task = get_syncer_operator(dag) - clean_tsvs_task = get_cleaner_operator(dag) - empty_dir_task = get_deleter_operator(dag) - end_task = get_log_operator(dag, DAG_ID, "Finished") - - ( - start_task - >> create_dir_task - >> sync_tsvs_task - >> clean_tsvs_task - >> empty_dir_task - >> end_task - ) + create_dir_task = get_creator_operator() + sync_tsvs_task = get_syncer_operator() + clean_tsvs_task = get_cleaner_operator() + empty_dir_task = get_deleter_operator() + + (create_dir_task >> sync_tsvs_task >> clean_tsvs_task >> empty_dir_task) + return dag diff --git a/openverse_catalog/dags/util/dag_factory.py b/openverse_catalog/dags/util/dag_factory.py index 18c2a5ce05..8fd852bbda 100644 --- a/openverse_catalog/dags/util/dag_factory.py +++ b/openverse_catalog/dags/util/dag_factory.py @@ -73,16 +73,12 @@ def create_provider_api_workflow( ) with dag: - start_task = ops.get_log_operator(dag, dag.dag_id, "Starting") if dated: - run_task = ops.get_dated_main_runner_operator( - dag, main_function, dagrun_timeout, day_shift=day_shift + ops.get_dated_main_runner_operator( + main_function, dagrun_timeout, day_shift=day_shift ) else: - run_task = ops.get_main_runner_operator(dag, main_function) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") - - start_task >> run_task >> end_task + ops.get_main_runner_operator(main_function) return dag @@ -184,27 +180,25 @@ def create_day_partitioned_ingestion_dag( ) with dag: ingest_operator_list_list = _build_ingest_operator_list_list( - reingestion_day_list_list, dag, main_function, ingestion_task_timeout + reingestion_day_list_list, main_function, ingestion_task_timeout ) - end_task = ops.get_log_operator(dag, dag.dag_id, "Finished") for i in range(len(ingest_operator_list_list) - 1): - wait_operator = ops.get_wait_till_done_operator(dag, f"wait_L{i}") - cross_downstream(ingest_operator_list_list[i], [wait_operator, end_task]) + wait_operator = ops.get_wait_till_done_operator(f"wait_L{i}") + cross_downstream(ingest_operator_list_list[i], [wait_operator]) wait_operator >> ingest_operator_list_list[i + 1] - ingest_operator_list_list[-1] >> end_task + ingest_operator_list_list[-1] return dag def _build_ingest_operator_list_list( - reingestion_day_list_list, dag, main_function, ingestion_task_timeout + reingestion_day_list_list, main_function, ingestion_task_timeout ): if reingestion_day_list_list[0] != [0]: reingestion_day_list_list = [[0]] + reingestion_day_list_list return [ [ ops.get_dated_main_runner_operator( - dag, main_function, ingestion_task_timeout, day_shift=d, diff --git a/openverse_catalog/dags/util/etl/operators.py b/openverse_catalog/dags/util/etl/operators.py index 725b7eb13d..08f1b1517f 100644 --- a/openverse_catalog/dags/util/etl/operators.py +++ b/openverse_catalog/dags/util/etl/operators.py @@ -1,4 +1,3 @@ -from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.hooks.s3 import S3Hook from airflow.providers.amazon.aws.operators.emr_create_job_flow import ( @@ -13,15 +12,7 @@ from airflow.utils.trigger_rule import TriggerRule -def get_log_operator(dag, status): - return BashOperator( - task_id=f"log_{dag.dag_id}_{status}", - bash_command=f"echo {status} {dag.dag_id} at $(date)", - trigger_rule=TriggerRule.ALL_SUCCESS, - ) - - -def get_check_cc_index_in_s3_sensor(dag, aws_conn_id): +def get_check_cc_index_in_s3_sensor(aws_conn_id): return S3PrefixSensor( task_id="check_for_cc_index", retries=0, @@ -35,7 +26,7 @@ def get_check_cc_index_in_s3_sensor(dag, aws_conn_id): ) -def get_check_wat_file_in_s3_sensor(dag, aws_conn_id): +def get_check_wat_file_in_s3_sensor(aws_conn_id): return S3KeySensor( task_id="check_for_wat_file", retries=0, diff --git a/openverse_catalog/dags/util/loader/operators.py b/openverse_catalog/dags/util/loader/operators.py index f39bd066e9..78436e1df5 100644 --- a/openverse_catalog/dags/util/loader/operators.py +++ b/openverse_catalog/dags/util/loader/operators.py @@ -12,27 +12,25 @@ def get_file_staging_operator( - dag, output_dir, minimum_file_age_minutes, identifier=TIMESTAMP_TEMPLATE + output_dir, minimum_file_age_minutes, identifier=TIMESTAMP_TEMPLATE ): return ShortCircuitOperator( task_id="stage_oldest_tsv_file", python_callable=paths.stage_oldest_tsv_file, op_args=[output_dir, identifier, minimum_file_age_minutes], - dag=dag, ) -def get_table_creator_operator(dag, postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): +def get_table_creator_operator(postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="create_loading_table", python_callable=sql.create_loading_table, op_args=[postgres_conn_id, identifier], - dag=dag, ) def get_load_local_data_operator( - dag, output_dir, postgres_conn_id, overwrite=False, identifier=TIMESTAMP_TEMPLATE + output_dir, postgres_conn_id, overwrite=False, identifier=TIMESTAMP_TEMPLATE ): return PythonOperator( task_id="load_local_data", @@ -40,23 +38,20 @@ def get_load_local_data_operator( op_kwargs={"overwrite": overwrite}, op_args=[output_dir, postgres_conn_id, identifier], trigger_rule=TriggerRule.ALL_SUCCESS, - dag=dag, ) def get_copy_to_s3_operator( - dag, output_dir, storage_bucket, aws_conn_id, identifier=TIMESTAMP_TEMPLATE + output_dir, storage_bucket, aws_conn_id, identifier=TIMESTAMP_TEMPLATE ): return PythonOperator( task_id="copy_to_s3", python_callable=loader.copy_to_s3, op_args=[output_dir, storage_bucket, identifier, aws_conn_id], - dag=dag, ) def get_load_s3_data_operator( - dag, bucket, aws_conn_id, postgres_conn_id, @@ -68,124 +63,99 @@ def get_load_s3_data_operator( python_callable=loader.load_s3_data, op_kwargs={"overwrite": overwrite}, op_args=[bucket, aws_conn_id, postgres_conn_id, identifier], - dag=dag, ) -def get_one_failed_switch(dag, identifier): +def get_one_failed_switch(identifier): return DummyOperator( task_id=f"one_failed_{identifier}", trigger_rule=TriggerRule.ONE_FAILED, - dag=dag, ) -def get_all_failed_switch(dag, identifier): +def get_all_failed_switch(identifier): return DummyOperator( task_id=f"all_failed_{identifier}", trigger_rule=TriggerRule.ALL_FAILED, - dag=dag, ) -def get_one_success_switch(dag, identifier): +def get_one_success_switch(identifier): return DummyOperator( task_id=f"one_success_{identifier}", trigger_rule=TriggerRule.ONE_SUCCESS, - dag=dag, ) -def get_all_done_switch(dag, identifier): +def get_all_done_switch(identifier): return DummyOperator( task_id=f"all_done_{identifier}", trigger_rule=TriggerRule.ALL_DONE, - dag=dag, ) -def get_file_deletion_operator(dag, output_dir, identifier=TIMESTAMP_TEMPLATE): +def get_file_deletion_operator(output_dir, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="delete_staged_file", python_callable=paths.delete_staged_file, op_args=[output_dir, identifier], trigger_rule=TriggerRule.ALL_SUCCESS, - dag=dag, ) -def get_drop_table_operator(dag, postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): +def get_drop_table_operator(postgres_conn_id, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="drop_loading_table", python_callable=sql.drop_load_table, op_args=[postgres_conn_id, identifier], trigger_rule=TriggerRule.ALL_DONE, - dag=dag, ) -def get_failure_moving_operator(dag, output_dir, identifier=TIMESTAMP_TEMPLATE): +def get_failure_moving_operator(output_dir, identifier=TIMESTAMP_TEMPLATE): return PythonOperator( task_id="move_staged_failures", python_callable=paths.move_staged_files_to_failure_directory, op_args=[output_dir, identifier], trigger_rule=TriggerRule.ONE_SUCCESS, - dag=dag, ) -def get_flickr_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_flickr_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_flickr_sub_providers", python_callable=sql.update_flickr_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_europeana_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_europeana_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_europeana_sub_providers", python_callable=sql.update_europeana_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_smithsonian_sub_provider_update_operator( - dag, - postgres_conn_id, -): +def get_smithsonian_sub_provider_update_operator(postgres_conn_id): return PythonOperator( task_id="update_smithsonian_sub_providers", python_callable=sql.update_smithsonian_sub_providers, op_args=[postgres_conn_id], - dag=dag, ) -def get_smithsonian_unit_code_operator( - dag, - postgres_conn_id, -): +def get_smithsonian_unit_code_operator(postgres_conn_id): return PythonOperator( task_id="check_new_smithsonian_unit_codes", python_callable=smithsonian_unit_codes.alert_unit_codes_from_api, op_args=[postgres_conn_id], - dag=dag, ) -def get_image_expiration_operator(dag, postgres_conn_id, provider): +def get_image_expiration_operator(postgres_conn_id, provider): return PythonOperator( task_id=f"expire_outdated_images_of_{provider}", python_callable=sql.expire_old_images, op_args=[postgres_conn_id, provider], - dag=dag, ) diff --git a/openverse_catalog/dags/util/operator_util.py b/openverse_catalog/dags/util/operator_util.py index f8f32b57c5..94fde5cac0 100644 --- a/openverse_catalog/dags/util/operator_util.py +++ b/openverse_catalog/dags/util/operator_util.py @@ -4,16 +4,14 @@ from airflow.utils.trigger_rule import TriggerRule -def get_runner_operator(dag, source, script_location): +def get_runner_operator(source, script_location): return BashOperator( task_id=f"get_{source}_images", bash_command=f"python {script_location} --mode default", - dag=dag, ) def get_dated_main_runner_operator( - dag, main_function, execution_timeout, day_shift=0, @@ -26,26 +24,16 @@ def get_dated_main_runner_operator( op_args=[args_str], execution_timeout=execution_timeout, depends_on_past=False, - dag=dag, ) -def get_main_runner_operator(dag, main_function): +def get_main_runner_operator(main_function): return PythonOperator( task_id="pull_image_data", python_callable=main_function, depends_on_past=False, - dag=dag, ) -def get_log_operator(dag, source, status): - return BashOperator( - task_id=f"{source}_{status}", - bash_command=f"echo {status} {source} workflow at $(date)", - dag=dag, - ) - - -def get_wait_till_done_operator(dag, task_id): - return DummyOperator(task_id=task_id, trigger_rule=TriggerRule.ALL_DONE, dag=dag) +def get_wait_till_done_operator(task_id): + return DummyOperator(task_id=task_id, trigger_rule=TriggerRule.ALL_DONE) diff --git a/openverse_catalog/dags/util/popularity/operators.py b/openverse_catalog/dags/util/popularity/operators.py index 613dcb4954..f0dcb8972a 100644 --- a/openverse_catalog/dags/util/popularity/operators.py +++ b/openverse_catalog/dags/util/popularity/operators.py @@ -8,7 +8,6 @@ def drop_media_popularity_relations( - dag, postgres_conn_id, media_type="image", ): @@ -16,12 +15,10 @@ def drop_media_popularity_relations( task_id="drop_media_popularity_relations", python_callable=sql.drop_media_popularity_relations, op_args=[postgres_conn_id, media_type], - dag=dag, ) def drop_media_popularity_functions( - dag, postgres_conn_id, media_type="image", ): @@ -29,12 +26,10 @@ def drop_media_popularity_functions( task_id=f"drop_{media_type}_popularity_functions", python_callable=sql.drop_media_popularity_functions, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_metrics( - dag, postgres_conn_id, media_type="image", ): @@ -42,12 +37,10 @@ def create_media_popularity_metrics( task_id=f"create_{media_type}_popularity_metrics_table", python_callable=sql.create_media_popularity_metrics, op_args=[postgres_conn_id, media_type], - dag=dag, ) def update_media_popularity_metrics( - dag, postgres_conn_id, media_type="image", ): @@ -55,12 +48,10 @@ def update_media_popularity_metrics( task_id=f"update_{media_type}_popularity_metrics_table", python_callable=sql.update_media_popularity_metrics, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_percentile( - dag, postgres_conn_id, media_type="image", ): @@ -68,12 +59,10 @@ def create_media_popularity_percentile( task_id=f"create_{media_type}_popularity_percentile", python_callable=sql.create_media_popularity_percentile_function, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_popularity_constants( - dag, postgres_conn_id, media_type="image", ): @@ -81,12 +70,10 @@ def create_media_popularity_constants( task_id=f"create_{media_type}_popularity_constants_view", python_callable=sql.create_media_popularity_constants_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) def update_media_popularity_constants( - dag, postgres_conn_id, media_type="image", ): @@ -94,12 +81,10 @@ def update_media_popularity_constants( task_id=f"update_{media_type}_popularity_constants_view", python_callable=sql.update_media_popularity_constants, op_args=[postgres_conn_id, media_type], - dag=dag, ) def create_media_standardized_popularity( - dag, postgres_conn_id, media_type="image", ): @@ -107,23 +92,20 @@ def create_media_standardized_popularity( task_id=f"create_{media_type}_standardized_popularity", python_callable=sql.create_standardized_media_popularity_function, op_args=[postgres_conn_id, media_type], - dag=dag, ) -def create_db_view(dag, postgres_conn_id, media_type="image"): +def create_db_view(postgres_conn_id, media_type="image"): return PythonOperator( task_id=f"create_{media_type}_view", python_callable=sql.create_media_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) -def update_db_view(dag, postgres_conn_id, media_type="image"): +def update_db_view(postgres_conn_id, media_type="image"): return PythonOperator( task_id=f"update_{media_type}_view", python_callable=sql.update_db_view, op_args=[postgres_conn_id, media_type], - dag=dag, ) diff --git a/tests/dags/test_common_api_workflows.py b/tests/dags/test_common_api_workflows.py index 0a06fc1480..fb1cef041f 100644 --- a/tests/dags/test_common_api_workflows.py +++ b/tests/dags/test_common_api_workflows.py @@ -19,18 +19,9 @@ def test_dags_load_with_no_errors(tmpdir): def test_create_dag_creates_correct_dependencies(): dag = caw.create_dag("test_source", "test_script_location", "test_dag_id") - start_id = "test_source_starting" run_id = "get_test_source_images" - finish_id = "test_source_finished" - start_task = dag.get_task(start_id) - assert start_task.upstream_task_ids == set() - assert start_task.downstream_task_ids == set([run_id]) run_task = dag.get_task(run_id) - assert run_task.upstream_task_ids == set([start_id]) - assert run_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set([run_id]) - assert finish_task.downstream_task_ids == set() + assert run_task.downstream_task_ids == set() def test_create_dag_adds_schedule_interval(): diff --git a/tests/dags/util/test_dag_factory.py b/tests/dags/util/test_dag_factory.py index 50b29ae362..10ecf79e3f 100644 --- a/tests/dags/util/test_dag_factory.py +++ b/tests/dags/util/test_dag_factory.py @@ -11,19 +11,13 @@ def test_create_day_partitioned_ingestion_dag_with_single_layer_dependencies(): wait0_id = "wait_L0" ingest1_id = "ingest_1" ingest2_id = "ingest_2" - finish_id = "test_dag_Finished" today_task = dag.get_task(today_id) assert today_task.upstream_task_ids == set() - assert today_task.downstream_task_ids == set([wait0_id, finish_id]) + assert today_task.downstream_task_ids == set([wait0_id]) ingest1_task = dag.get_task(ingest1_id) assert ingest1_task.upstream_task_ids == set([wait0_id]) - assert ingest1_task.downstream_task_ids == set([finish_id]) ingest2_task = dag.get_task(ingest2_id) assert ingest2_task.upstream_task_ids == set([wait0_id]) - assert ingest2_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set([today_id, ingest1_id, ingest2_id]) - assert finish_task.downstream_task_ids == set() def test_create_day_partitioned_ingestion_dag_with_multi_layer_dependencies(): @@ -40,27 +34,15 @@ def test_create_day_partitioned_ingestion_dag_with_multi_layer_dependencies(): ingest3_id = "ingest_3" ingest4_id = "ingest_4" ingest5_id = "ingest_5" - finish_id = "test_dag_Finished" today_task = dag.get_task(today_id) assert today_task.upstream_task_ids == set() - assert today_task.downstream_task_ids == set([wait0_id, finish_id]) ingest1_task = dag.get_task(ingest1_id) assert ingest1_task.upstream_task_ids == set([wait0_id]) - assert ingest1_task.downstream_task_ids == set([wait1_id, finish_id]) ingest2_task = dag.get_task(ingest2_id) assert ingest2_task.upstream_task_ids == set([wait0_id]) - assert ingest2_task.downstream_task_ids == set([wait1_id, finish_id]) ingest3_task = dag.get_task(ingest3_id) assert ingest3_task.upstream_task_ids == set([wait1_id]) - assert ingest3_task.downstream_task_ids == set([finish_id]) ingest4_task = dag.get_task(ingest4_id) assert ingest4_task.upstream_task_ids == set([wait1_id]) - assert ingest4_task.downstream_task_ids == set([finish_id]) ingest5_task = dag.get_task(ingest5_id) assert ingest5_task.upstream_task_ids == set([wait1_id]) - assert ingest5_task.downstream_task_ids == set([finish_id]) - finish_task = dag.get_task(finish_id) - assert finish_task.upstream_task_ids == set( - [today_id, ingest1_id, ingest2_id, ingest3_id, ingest4_id, ingest5_id] - ) - assert finish_task.downstream_task_ids == set() diff --git a/tests/dags/util/test_operator_util.py b/tests/dags/util/test_operator_util.py index e9ca133978..b6f9f6651d 100644 --- a/tests/dags/util/test_operator_util.py +++ b/tests/dags/util/test_operator_util.py @@ -10,8 +10,7 @@ def dated(dag_date): def test_get_runner_operator_creates_valid_string(): - dag = DAG(dag_id="test_dag", start_date=datetime.strptime("2019-01-01", "%Y-%m-%d")) - runner = op_util.get_runner_operator(dag, "test_source", "/test/script/location.py") + runner = op_util.get_runner_operator("test_source", "/test/script/location.py") expected_command = "python /test/script/location.py --mode default" assert runner.bash_command == expected_command @@ -22,16 +21,15 @@ def test_get_dated_main_runner_handles_zero_shift(capsys): tzinfo=timezone.utc ) main_func = dated - runner = op_util.get_dated_main_runner_operator( - dag, main_func, timedelta(minutes=1) - ) - ti = TaskInstance(runner, execution_date) - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) - # main_func.assert_called_with('2019-01-01') - # Mocking main_func causes errors because Airflow JSON-encodes it, - # and MagicMock is not JSON-serializable. - captured = capsys.readouterr() - assert captured.out == "2019-01-01\n" + with dag: + runner = op_util.get_dated_main_runner_operator(main_func, timedelta(minutes=1)) + ti = TaskInstance(runner, execution_date) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) + # main_func.assert_called_with('2019-01-01') + # Mocking main_func causes errors because Airflow JSON-encodes it, + # and MagicMock is not JSON-serializable. + captured = capsys.readouterr() + assert captured.out == "2019-01-01\n" def test_get_dated_main_runner_handles_day_shift(capsys): @@ -40,13 +38,14 @@ def test_get_dated_main_runner_handles_day_shift(capsys): tzinfo=timezone.utc ) main_func = dated - runner = op_util.get_dated_main_runner_operator( - dag, main_func, timedelta(minutes=1), day_shift=1 - ) - ti = TaskInstance(runner, execution_date) - ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) - # main_func.assert_called_with('2018-12-31') - # Mocking main_func causes errors because Airflow JSON-encodes it, - # and MagicMock is not JSON-serializable. - captured = capsys.readouterr() - assert captured.out == "2018-12-31\n" + with dag: + runner = op_util.get_dated_main_runner_operator( + main_func, timedelta(minutes=1), day_shift=1 + ) + ti = TaskInstance(runner, execution_date) + ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) + # main_func.assert_called_with('2018-12-31') + # Mocking main_func causes errors because Airflow JSON-encodes it, + # and MagicMock is not JSON-serializable. + captured = capsys.readouterr() + assert captured.out == "2018-12-31\n"