-
Notifications
You must be signed in to change notification settings - Fork 39
/
example_amazon_s3_snowflake_transform.py
98 lines (80 loc) · 2.48 KB
/
example_amazon_s3_snowflake_transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import time
from datetime import datetime, timedelta
import pandas as pd
# Uses data from https://www.kaggle.com/c/shelter-animal-outcomes
from airflow.decorators import dag
from astro import sql as aql
from astro.files import File
from astro.sql.table import Metadata, Table
@aql.transform()
def combine_data(center_1: Table, center_2: Table):
return """SELECT * FROM {{center_1}}
UNION SELECT * FROM {{center_2}}"""
@aql.transform()
def clean_data(input_table: Table):
return """SELECT *
FROM {{input_table}} WHERE type NOT LIKE 'Guinea Pig'
"""
@aql.dataframe()
def aggregate_data(df: pd.DataFrame):
new_df = df.pivot_table(
index="date", values="name", columns=["type"], aggfunc="count"
).reset_index()
new_df.columns = new_df.columns.str.lower()
return new_df
@dag(
start_date=datetime(2021, 1, 1),
max_active_runs=1,
schedule_interval="@daily",
default_args={
"email_on_failure": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
catchup=False,
)
def example_amazon_s3_snowflake_transform():
s3_bucket = os.getenv("S3_BUCKET", "s3://tmp9")
input_table_1 = Table(
name="ADOPTION_CENTER_1",
metadata=Metadata(
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
),
conn_id="snowflake_conn",
)
input_table_2 = Table(
name="ADOPTION_CENTER_2",
metadata=Metadata(
database=os.environ["SNOWFLAKE_DATABASE"],
schema=os.environ["SNOWFLAKE_SCHEMA"],
),
conn_id="snowflake_conn",
)
temp_table_1 = aql.load_file(
input_file=File(path=f"{s3_bucket}/ADOPTION_CENTER_1_unquoted.csv"),
output_table=input_table_1,
)
temp_table_2 = aql.load_file(
input_file=File(path=f"{s3_bucket}/ADOPTION_CENTER_2_unquoted.csv"),
output_table=input_table_2,
)
combined_data = combine_data(
center_1=temp_table_1,
center_2=temp_table_2,
)
cleaned_data = clean_data(combined_data)
aggregate_data(
cleaned_data,
output_table=Table(
name="aggregated_adoptions_" + str(int(time.time())),
metadata=Metadata(
schema=os.environ["SNOWFLAKE_SCHEMA"],
database=os.environ["SNOWFLAKE_DATABASE"],
),
conn_id="snowflake_conn",
),
)
aql.cleanup()
dag = example_amazon_s3_snowflake_transform()