forked from calogica/dbt-expectations
-
Notifications
You must be signed in to change notification settings - Fork 0
/
expect_column_values_to_be_within_n_moving_stdevs.sql
141 lines (120 loc) · 4.64 KB
/
expect_column_values_to_be_within_n_moving_stdevs.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
{%- macro _get_metric_expression(metric_column, take_logs) -%}
{%- if take_logs %}
{%- set expr = "nullif(" ~ metric_column ~ ", 0)" -%}
coalesce({{ dbt_expectations.log_natural(expr) }}, 0)
{%- else -%}
coalesce({{ metric_column }}, 0)
{%- endif %}
{%- endmacro -%}
{% test expect_column_values_to_be_within_n_moving_stdevs(model,
column_name,
date_column_name,
period='day',
lookback_periods=1,
trend_periods=7,
test_periods=14,
sigma_threshold=3,
sigma_threshold_upper=None,
sigma_threshold_lower=None,
take_diffs=true,
take_logs=true
) -%}
{{ adapter.dispatch('test_expect_column_values_to_be_within_n_moving_stdevs', 'dbt_expectations') (model,
column_name,
date_column_name,
period,
lookback_periods,
trend_periods,
test_periods,
sigma_threshold,
sigma_threshold_upper,
sigma_threshold_lower,
take_diffs,
take_logs
) }}
{%- endtest %}
{% macro default__test_expect_column_values_to_be_within_n_moving_stdevs(model,
column_name,
date_column_name,
period,
lookback_periods,
trend_periods,
test_periods,
sigma_threshold,
sigma_threshold_upper,
sigma_threshold_lower,
take_diffs,
take_logs
) %}
{%- set sigma_threshold_upper = sigma_threshold_upper if sigma_threshold_upper else sigma_threshold -%}
{%- set sigma_threshold_lower = sigma_threshold_lower if sigma_threshold_lower else -1 * sigma_threshold -%}
with metric_values as (
with grouped_metric_values as (
select
{{ dbt_utils.date_trunc(period, date_column_name) }} as metric_period,
sum({{ column_name }}) as agg_metric_value
from
{{ model }}
group by
1
),
{%- if take_diffs %}
grouped_metric_values_with_priors as (
select
*,
lag(agg_metric_value, {{ lookback_periods }}) over(order by metric_period) as prior_agg_metric_value
from
grouped_metric_values d
)
select
*,
{{ dbt_expectations._get_metric_expression("agg_metric_value", take_logs) }}
-
{{ dbt_expectations._get_metric_expression("prior_agg_metric_value", take_logs) }}
as metric_test_value
from
grouped_metric_values_with_priors d
{%- else %}
select
*,
{{ dbt_expectations._get_metric_expression("agg_metric_value", take_logs) }}
from
grouped_metric_values
{%- endif %}
),
metric_moving_calcs as (
select
*,
avg(metric_test_value)
over(order by metric_period rows
between {{ trend_periods }} preceding and 1 preceding) as metric_test_rolling_average,
stddev(metric_test_value)
over(order by metric_period rows
between {{ trend_periods }} preceding and 1 preceding) as metric_test_rolling_stddev
from
metric_values
),
metric_sigma as (
select
*,
(metric_test_value - metric_test_rolling_average) as metric_test_delta,
(metric_test_value - metric_test_rolling_average)/nullif(metric_test_rolling_stddev, 0) as metric_test_sigma
from
metric_moving_calcs
)
select
*
from
metric_sigma
where
metric_period >= cast(
{{ dbt_utils.dateadd(period, -test_periods, dbt_utils.date_trunc(period, dbt_date.now())) }}
as {{ dbt_utils.type_timestamp() }})
and
metric_period < {{ dbt_utils.date_trunc(period, dbt_date.now()) }}
and
not (
metric_test_sigma >= {{ sigma_threshold_lower }} and
metric_test_sigma <= {{ sigma_threshold_upper }}
)
{%- endmacro -%}