-
Notifications
You must be signed in to change notification settings - Fork 1
/
table_export_dag.py
41 lines (38 loc) · 1.6 KB
/
table_export_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""
Regularly exports a table's rows to an S3 bucket as JSON files
A detailed tutorial is available at https://community.crate.io/t/cratedb-and-apache-airflow-automating-data-export-to-s3/901
"""
import os
import pendulum
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator
from airflow.models.baseoperator import chain
from include.table_exports import TABLES
with DAG(
dag_id="cratedb_table_export",
start_date=pendulum.datetime(2021, 11, 11, tz="UTC"),
schedule_interval="@daily",
catchup=False,
) as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
with TaskGroup(group_id='table_exports') as tg1:
for export_table in TABLES:
PostgresOperator(
task_id=f"copy_{export_table['table']}",
postgres_conn_id="cratedb_connection",
sql="""
COPY {table} WHERE DATE_TRUNC('day', {timestamp_column}) = '{day}'
TO DIRECTORY 's3://{access}:{secret}@{target_bucket}-{day}';
""".format(
table=export_table['table'],
timestamp_column=export_table['timestamp_column'],
target_bucket=export_table['target_bucket'],
day='{{ macros.ds_add(ds, -1) }}',
access=os.environ.get("ACCESS_KEY_ID"),
secret=os.environ.get("SECRET_ACCESS_KEY")
)
)
chain(start, tg1, end)