Skip to content

Commit

Permalink
Merge pull request #238 from WordPress/rm-get-log-operator
Browse files Browse the repository at this point in the history
  • Loading branch information
zackkrida committed Oct 11, 2021
2 parents 926e6d4 + 1992ba1 commit a966655
Show file tree
Hide file tree
Showing 25 changed files with 101 additions and 350 deletions.
16 changes: 2 additions & 14 deletions openverse_catalog/dags/airflow_log_cleanup_workflow.py
Expand Up @@ -19,7 +19,6 @@
from datetime import datetime, timedelta

import jinja2
import util.operator_util as ops
from airflow.configuration import conf
from airflow.models import DAG
from airflow.operators.python import PythonOperator
Expand Down Expand Up @@ -52,10 +51,7 @@
}


def get_log_cleaner_operator(
dag,
base_log_folder,
):
def get_log_cleaner_operator(base_log_folder):
return PythonOperator(
task_id="log_cleaner_operator",
python_callable=log_cleanup.clean_up,
Expand All @@ -64,7 +60,6 @@ def get_log_cleaner_operator(
"{{ params.get('maxLogAgeInDays') }}",
"{{ params.get('enableDelete') }}",
],
dag=dag,
)


Expand All @@ -88,14 +83,7 @@ def create_dag(
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, "Starting")
run_task = get_log_cleaner_operator(
dag,
BASE_LOG_FOLDER,
)
end_task = ops.get_log_operator(dag, dag.dag_id, "Finished")

start_task >> run_task >> end_task
get_log_cleaner_operator(BASE_LOG_FOLDER)

return dag

Expand Down
Expand Up @@ -8,7 +8,6 @@
import os
from datetime import datetime, timedelta

import util.operator_util as ops
from airflow import DAG
from util.loader import operators

Expand Down Expand Up @@ -51,11 +50,7 @@ def create_dag(
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, "Starting")
run_task = operators.get_smithsonian_unit_code_operator(dag, postgres_conn_id)
end_task = ops.get_log_operator(dag, dag.dag_id, "Finished")

start_task >> run_task >> end_task
operators.get_smithsonian_unit_code_operator(postgres_conn_id)

return dag

Expand Down
13 changes: 3 additions & 10 deletions openverse_catalog/dags/cleaner_workflow.py
Expand Up @@ -5,7 +5,7 @@

from airflow import DAG
from airflow.operators.python import PythonOperator
from util import config, operator_util, pg_cleaner
from util import config, pg_cleaner


logging.basicConfig(
Expand Down Expand Up @@ -43,18 +43,12 @@ def create_id_partitioned_cleaner_dag(
)
hex_prefixes = pg_cleaner.hex_counter(prefix_length)
with dag:
cleaner_list = [
_get_pg_cleaner_operator(dag, prefix, postgres_conn_id)
for prefix in hex_prefixes
]
start_task = operator_util.get_log_operator(dag, dag.dag_id, "Started")
end_task = operator_util.get_log_operator(dag, dag.dag_id, "Ended")
start_task >> cleaner_list >> end_task
[_get_pg_cleaner_operator(prefix, postgres_conn_id) for prefix in hex_prefixes]

return dag


def _get_pg_cleaner_operator(
dag,
prefix,
postgres_conn_id,
desired_length=DESIRED_PREFIX_LENGTH,
Expand All @@ -70,7 +64,6 @@ def _get_pg_cleaner_operator(
"delay_minutes": delay,
},
depends_on_past=False,
dag=dag,
)


Expand Down
8 changes: 2 additions & 6 deletions openverse_catalog/dags/common_api_workflows.py
Expand Up @@ -4,7 +4,7 @@
import util.config as conf
from airflow import DAG
from croniter import croniter
from util.operator_util import get_log_operator, get_runner_operator
from util.operator_util import get_runner_operator


logging.basicConfig(
Expand Down Expand Up @@ -52,11 +52,7 @@ def create_dag(
)

with dag:
start_task = get_log_operator(dag, source, "starting")
run_task = get_runner_operator(dag, source, script_location)
end_task = get_log_operator(dag, source, "finished")

start_task >> run_task >> end_task
get_runner_operator(source, script_location)

return dag

Expand Down
12 changes: 2 additions & 10 deletions openverse_catalog/dags/commoncrawl_etl.py
Expand Up @@ -165,16 +165,11 @@
concurrency=1,
catchup=False,
) as dag:

job_start_logger = operators.get_log_operator(dag, "Starting")

check_for_cc_index = operators.get_check_cc_index_in_s3_sensor(
dag,
AWS_CONN_ID,
)

check_for_wat_file = operators.get_check_wat_file_in_s3_sensor(
dag,
AWS_CONN_ID,
)

Expand Down Expand Up @@ -207,15 +202,12 @@
AWS_CONN_ID,
)

job_done_logger = operators.get_log_operator(dag, "Finished")

(
job_start_logger
>> check_for_cc_index
check_for_cc_index
>> check_for_wat_file
>> [extract_script_loader, cluster_bootstrap_loader]
>> job_flow_creator
>> job_sensor
>> job_flow_terminator
)
[job_flow_creator, job_sensor, job_flow_terminator] >> job_done_logger
[job_flow_creator, job_sensor, job_flow_terminator]
Expand Up @@ -8,7 +8,6 @@
import os
from datetime import datetime, timedelta

import util.operator_util as ops
from airflow import DAG
from util.loader import operators

Expand Down Expand Up @@ -51,13 +50,7 @@ def create_dag(
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, "Starting")
run_task = operators.get_europeana_sub_provider_update_operator(
dag, postgres_conn_id
)
end_task = ops.get_log_operator(dag, dag.dag_id, "Finished")

start_task >> run_task >> end_task
operators.get_europeana_sub_provider_update_operator(postgres_conn_id)

return dag

Expand Down
Expand Up @@ -8,7 +8,6 @@
import os
from datetime import datetime, timedelta

import util.operator_util as ops
from airflow import DAG
from util.loader import operators

Expand Down Expand Up @@ -51,13 +50,7 @@ def create_dag(
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, "Starting")
run_task = operators.get_flickr_sub_provider_update_operator(
dag, postgres_conn_id
)
end_task = ops.get_log_operator(dag, dag.dag_id, "Finished")

start_task >> run_task >> end_task
operators.get_flickr_sub_provider_update_operator(postgres_conn_id)

return dag

Expand Down
9 changes: 2 additions & 7 deletions openverse_catalog/dags/image_expiration_workflow.py
Expand Up @@ -7,7 +7,6 @@
import os
from datetime import datetime, timedelta

import util.operator_util as ops
from airflow import DAG
from util.loader import operators, sql

Expand Down Expand Up @@ -50,14 +49,10 @@ def create_dag(
)

with dag:
start_task = ops.get_log_operator(dag, dag.dag_id, "Starting")
run_task_list = [
operators.get_image_expiration_operator(dag, postgres_conn_id, provider)
[
operators.get_image_expiration_operator(postgres_conn_id, provider)
for provider in sql.OLDEST_PER_PROVIDER
]
end_task = ops.get_log_operator(dag, dag.dag_id, "Finished")

start_task >> run_task_list >> end_task

return dag

Expand Down
18 changes: 5 additions & 13 deletions openverse_catalog/dags/loader_workflow.py
Expand Up @@ -111,55 +111,47 @@ def create_dag(

with dag:
stage_oldest_tsv_file = operators.get_file_staging_operator(
dag,
output_dir,
minimum_file_age_minutes,
identifier=identifier,
)
create_loading_table = operators.get_table_creator_operator(
dag,
postgres_conn_id,
identifier=identifier,
)
copy_to_s3 = operators.get_copy_to_s3_operator(
dag,
output_dir,
storage_bucket,
aws_conn_id,
identifier=identifier,
)
load_s3_data = operators.get_load_s3_data_operator(
dag,
storage_bucket,
aws_conn_id,
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_failed_s3 = operators.get_one_failed_switch(dag, "s3")
one_failed_s3 = operators.get_one_failed_switch("s3")
load_local_data = operators.get_load_local_data_operator(
dag,
output_dir,
postgres_conn_id,
overwrite=overwrite,
identifier=identifier,
)
one_success_save = operators.get_one_success_switch(dag, "save")
all_done_save = operators.get_all_done_switch(dag, "save")
all_failed_save = operators.get_all_failed_switch(dag, "save")
one_success_save = operators.get_one_success_switch("save")
all_done_save = operators.get_all_done_switch("save")
all_failed_save = operators.get_all_failed_switch("save")
delete_staged_file = operators.get_file_deletion_operator(
dag,
output_dir,
identifier=identifier,
)
one_failed_delete = operators.get_one_failed_switch(dag, "delete")
one_failed_delete = operators.get_one_failed_switch("delete")
drop_loading_table = operators.get_drop_table_operator(
dag,
postgres_conn_id,
identifier=identifier,
)
move_staged_failures = operators.get_failure_moving_operator(
dag,
output_dir,
identifier=identifier,
)
Expand Down
15 changes: 1 addition & 14 deletions openverse_catalog/dags/recreate_audio_popularity_calculation.py
Expand Up @@ -12,7 +12,6 @@
from datetime import datetime, timedelta

from airflow import DAG
from util.operator_util import get_log_operator
from util.popularity import operators


Expand Down Expand Up @@ -54,58 +53,46 @@ def create_dag(
catchup=False,
)
with dag:
start_task = get_log_operator(dag, DAG_ID, "Starting")
drop_relations = operators.drop_media_popularity_relations(
dag,
postgres_conn_id,
"audio",
)
drop_functions = operators.drop_media_popularity_functions(
dag,
postgres_conn_id,
"audio",
)
create_metrics = operators.create_media_popularity_metrics(
dag,
postgres_conn_id,
"audio",
)
update_metrics = operators.update_media_popularity_metrics(
dag,
postgres_conn_id,
"audio",
)
create_percentile = operators.create_media_popularity_percentile(
dag,
postgres_conn_id,
"audio",
)
create_constants = operators.create_media_popularity_constants(
dag,
postgres_conn_id,
"audio",
)
create_popularity = operators.create_media_standardized_popularity(
dag,
postgres_conn_id,
"audio",
)
create_db_view = operators.create_db_view(
dag,
postgres_conn_id,
"audio",
)
end_task = get_log_operator(dag, DAG_ID, "Finished")

(
start_task
>> [drop_relations, drop_functions]
[drop_relations, drop_functions]
>> create_metrics
>> [update_metrics, create_percentile]
>> create_constants
>> create_popularity
>> create_db_view
>> end_task
)

return dag
Expand Down

0 comments on commit a966655

Please sign in to comment.