Skip to content

Commit

Permalink
Feat/vanilla ae refactor (#379)
Browse files Browse the repository at this point in the history
- Refactor Vanilla AE
- Support static filters in Druid
- Add Percentile Scaler

---------

Signed-off-by: Avik Basu <ab93@users.noreply.github.com>
Co-authored-by: Gulshan Bhatia <gulshan_bhatia@intuit.com>
Co-authored-by: Kushal Batra <34571348+s0nicboOm@users.noreply.github.com>
Co-authored-by: Nandita Koppisetty <nandita.iitkgp@gmail.com>
  • Loading branch information
4 people committed May 14, 2024
1 parent f29f771 commit 934d8cf
Show file tree
Hide file tree
Showing 32 changed files with 612 additions and 285 deletions.
35 changes: 30 additions & 5 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ class OutDataFrames:
thresh_out: pd.DataFrame
postproc_out: pd.DataFrame
unified_out: pd.DataFrame
static_out: Optional[pd.DataFrame] = None
static_features: Optional[pd.DataFrame] = None
adjusted_unified: Optional[pd.DataFrame] = None


class PromBacktester:
Expand Down Expand Up @@ -135,9 +138,14 @@ def train_models(
x_train = df_train.to_numpy(dtype=np.float32)
LOGGER.info("Training data shape: %s", x_train.shape)

if self.nlconf.trainer.transforms:
train_txs = PreprocessFactory().get_pipeline_instance(self.nlconf.trainer.transforms)
else:
train_txs = None
artifacts = UDFFactory.get_udf_cls("promtrainer").compute(
model=ModelFactory().get_instance(self.nlconf.model),
input_=x_train,
trainer_transform=train_txs,
preproc_clf=PreprocessFactory().get_pipeline_instance(self.nlconf.preprocess),
threshold_clf=ThresholdFactory().get_instance(self.nlconf.threshold),
numalogic_cfg=self.nlconf,
Expand Down Expand Up @@ -238,6 +246,8 @@ def generate_scores(

x_recon = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
raw_scores = np.zeros((len(ds), self.seq_len, n_feat), dtype=np.float32)
unified_raw_scores = np.zeros((len(ds), 1), dtype=np.float32)

feature_scores = np.zeros((len(ds), n_feat), dtype=np.float32)
unified_scores = np.zeros((len(ds), 1), dtype=np.float32)

Expand All @@ -253,12 +263,17 @@ def generate_scores(

winscores = postproc_udf.compute_feature_scores(
raw_scores[idx], self.nlconf.score.window_agg
) # (nfeat,)

unified_raw_scores[idx] = postproc_udf.compute_unified_score(
winscores,
feat_agg_conf=self.nlconf.score.feature_agg,
)

feature_scores[idx] = postproc_udf.compute_postprocess(postproc_func, winscores)

unified_scores[idx] = postproc_udf.compute_unified_score(
feature_scores[idx], self.nlconf.score.feature_agg
unified_scores[idx] = postproc_udf.compute_postprocess(
postproc_func, unified_raw_scores[idx]
)

x_recon = self.window_inverse(x_recon)
Expand All @@ -274,14 +289,24 @@ def generate_scores(
[np.full((len(x_test) - len(ds), 1), fill_value=np.nan), unified_scores]
)

return self._construct_output(
out_dfs = self._construct_output(
df_test,
preproc_out=x_scaled,
nn_out=x_recon,
thresh_out=raw_scores,
postproc_out=feature_scores,
unified_out=unified_scores,
)
if self.nlconf.score.adjust:
static_scores = self.generate_static_scores(df_test)
out_dfs.static_out = static_scores["static_unified"]
out_dfs.static_features = static_scores["static_features"]

out_dfs.adjusted_unified = pd.concat(
[out_dfs.unified_out, out_dfs.static_out], axis=1
).max(axis=1)

return out_dfs

def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame:
if not self.nlconf.score.adjust:
Expand All @@ -305,12 +330,12 @@ def generate_static_scores(self, df: pd.DataFrame) -> pd.DataFrame:
)
feature_scores = np.vstack(
[
np.full((self.seq_len - 1, len(metrics)), fill_value=np.nan),
np.full((len(x_test) - len(ds), len(metrics)), fill_value=np.nan),
feature_scores,
]
)
unified_scores = np.vstack(
[np.full((self.seq_len - 1, 1), fill_value=np.nan), unified_scores]
[np.full((len(x_test) - len(ds), 1), fill_value=np.nan), unified_scores]
)
dfs = {
"input": df,
Expand Down
2 changes: 1 addition & 1 deletion numalogic/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class NumalogicConf:
trainer: TrainerConf = field(default_factory=TrainerConf)
preprocess: list[ModelInfo] = field(default_factory=list)
threshold: ModelInfo = field(default_factory=lambda: ModelInfo(name="StdDevThreshold"))
postprocess: ModelInfo = field(
postprocess: Optional[ModelInfo] = field(
default_factory=lambda: ModelInfo(name="TanhNorm", stateful=False)
)
score: ScoreConf = field(default_factory=lambda: ScoreConf())
Expand Down
12 changes: 10 additions & 2 deletions numalogic/config/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class PreprocessFactory(_ObjectFactory):
GaussianNoiseAdder,
DifferenceTransform,
FlattenVector,
PercentileScaler,
ExpMovingAverage,
)

_CLS_MAP: ClassVar[dict] = {
Expand All @@ -65,6 +67,8 @@ class PreprocessFactory(_ObjectFactory):
"GaussianNoiseAdder": GaussianNoiseAdder,
"DifferenceTransform": DifferenceTransform,
"FlattenVector": FlattenVector,
"PercentileScaler": PercentileScaler,
"ExpMovingAverage": ExpMovingAverage,
}

def get_pipeline_instance(self, objs_info: list[ModelInfo]):
Expand All @@ -82,9 +86,13 @@ def get_pipeline_instance(self, objs_info: list[ModelInfo]):
class PostprocessFactory(_ObjectFactory):
"""Factory class to create postprocess instances."""

from numalogic.transforms import TanhNorm, ExpMovingAverage
from numalogic.transforms import TanhNorm, ExpMovingAverage, SigmoidNorm

_CLS_MAP: ClassVar[dict] = {"TanhNorm": TanhNorm, "ExpMovingAverage": ExpMovingAverage}
_CLS_MAP: ClassVar[dict] = {
"TanhNorm": TanhNorm,
"ExpMovingAverage": ExpMovingAverage,
"SigmoidNorm": SigmoidNorm,
}


class ThresholdFactory(_ObjectFactory):
Expand Down
6 changes: 5 additions & 1 deletion numalogic/connectors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
RDSConf,
RDSFetcherConf,
)
from numalogic.connectors.rds import RDSFetcher
from numalogic.connectors.prometheus import PrometheusFetcher

__all__ = [
Expand All @@ -26,6 +25,11 @@
"RDSFetcherConf",
]

if find_spec("boto3"):
from numalogic.connectors.rds import RDSFetcher # noqa: F401

__all__.append("RDSFetcher")

if find_spec("pydruid"):
from numalogic.connectors.druid import DruidFetcher # noqa: F401

Expand Down
8 changes: 8 additions & 0 deletions numalogic/connectors/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ class Pivot:
index: str = "timestamp"
columns: list[str] = field(default_factory=list)
value: list[str] = field(default_factory=lambda: ["count"])
agg: list[str] = field(default_factory=lambda: ["sum"])


@dataclass
class FilterConf:
inclusion_filters: Optional[list[dict]] = None
exclusion_filters: Optional[list[dict]] = None


@dataclass
class DruidFetcherConf:
datasource: str
static_filters: Optional[FilterConf] = None
dimensions: list[str] = field(default_factory=list)
aggregations: dict = field(default_factory=dict)
group_by: list[str] = field(default_factory=list)
Expand Down
68 changes: 52 additions & 16 deletions numalogic/connectors/druid/_druid.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydruid.utils.filters import Filter

from numalogic.connectors._base import DataFetcher
from numalogic.connectors._config import Pivot
from numalogic.connectors._config import Pivot, FilterConf
from typing import Optional, Final

from numalogic.tools.exceptions import DruidFetcherError
Expand All @@ -34,13 +34,32 @@ def make_filter_pairs(filter_keys: list[str], filter_values: list[str]) -> dict[
return dict(zip(filter_keys, filter_values))


def _combine_in_filters(filters_list) -> Filter:
return Filter(type="and", fields=[Filter(**item) for item in filters_list])


def _combine_ex_filters(filters_list) -> Filter:
filters = _combine_in_filters(filters_list)
return Filter(type="not", field=filters)


def _make_static_filters(filters: FilterConf) -> Filter:
filter_list = []
if filters.inclusion_filters:
filter_list.append(_combine_in_filters(filters.inclusion_filters))
if filters.exclusion_filters:
filter_list.append(_combine_ex_filters(filters.exclusion_filters))
return Filter(type="and", fields=filter_list)


def build_params(
datasource: str,
dimensions: list[str],
filter_pairs: dict,
granularity: str,
hours: float,
delay: float,
static_filters: Optional[FilterConf] = None,
aggregations: Optional[list[str]] = None,
post_aggregations: Optional[list[str]] = None,
reference_dt: Optional[datetime] = None,
Expand All @@ -52,6 +71,7 @@ def build_params(
dimensions: The dimensions to group by
filter_pairs: Indicates which rows of
data to include in the query
static_filters: Static filters passed from config
granularity: Time bucket to aggregate data by hour, day, minute, etc.,
hours: Hours from now to skip training.
delay: Added delay to the fetch query from current time.
Expand All @@ -69,6 +89,11 @@ def build_params(
type="and",
fields=[Filter(type="selector", dimension=k, value=v) for k, v in filter_pairs.items()],
)
if static_filters:
_LOGGER.debug("Static Filters are present!")
_static_filters = _make_static_filters(static_filters)
_filter = Filter(type="and", fields=[_static_filters, _filter])

reference_dt = reference_dt or datetime.now(pytz.utc)
end_dt = reference_dt - timedelta(hours=delay)
_LOGGER.debug("Querying with end_dt: %s, that is with delay of %s hrs", end_dt, delay)
Expand Down Expand Up @@ -118,6 +143,7 @@ def fetch(
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
static_filters: Optional[FilterConf] = None,
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
Expand All @@ -135,6 +161,7 @@ def fetch(
dimensions: The dimensions to group by
delay: Added delay to the fetch query from current time.
granularity: Time bucket to aggregate data by hour, day, minute, etc.
static_filters: user defined filters
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
post_aggregations: postaggregations map
Expand All @@ -152,6 +179,7 @@ def fetch(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
static_filters=static_filters,
granularity=granularity,
hours=hours,
delay=delay,
Expand All @@ -169,12 +197,16 @@ def fetch(
if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
# TODO: performance review
if pivot:
pivoted_frames = []
for idx, column in enumerate(pivot.columns):
_df = df.pivot_table(
index=pivot.index, columns=[column], values=pivot.value, aggfunc=pivot.agg[idx]
)
pivoted_frames.append(_df)

df = pd.concat(pivoted_frames, axis=1, join="outer")
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

Expand All @@ -193,6 +225,7 @@ def chunked_fetch(
dimensions: list[str],
delay: float = 3.0,
granularity: str = "minute",
static_filter: Optional[FilterConf] = None,
aggregations: Optional[dict] = None,
post_aggregations: Optional[dict] = None,
group_by: Optional[list[str]] = None,
Expand All @@ -213,6 +246,7 @@ def chunked_fetch(
granularity: Time bucket to aggregate data by hour, day, minute, etc.
aggregations: A map from aggregator name to one of the
``pydruid.utils.aggregators`` e.g., ``doublesum``
static_filter: user defined filters
post_aggregations: postaggregations map
group_by: List of columns to group by
pivot: Pivot configuration
Expand Down Expand Up @@ -245,6 +279,7 @@ def chunked_fetch(
datasource=datasource,
dimensions=dimensions,
filter_pairs=filter_pairs,
static_filters=static_filter,
granularity=granularity,
hours=min(chunked_hours, hours - hours_elapsed),
delay=delay,
Expand All @@ -259,21 +294,22 @@ def chunked_fetch(
_LOGGER.debug("Fetching data concurrently with %s threads", max_threads)
with ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = [executor.submit(self._fetch, **params) for params in qparams]
for future in futures:
chunked_dfs.append(future.result())

chunked_dfs.extend(future.result() for future in futures)
df = pd.concat(chunked_dfs, axis=0, ignore_index=True)
df["timestamp"] = pd.to_datetime(df["timestamp"]).astype("int64") // 10**6

if group_by:
df = df.groupby(by=group_by).sum().reset_index()

if pivot and pivot.columns:
df = df.pivot(
index=pivot.index,
columns=pivot.columns,
values=pivot.value,
)
if pivot:
pivoted_frames = []
for idx, column in enumerate(pivot.columns):
_df = df.pivot_table(
index=pivot.index, columns=[column], values=pivot.value, aggfunc=pivot.agg[idx]
)
pivoted_frames.append(_df)

df = pd.concat(pivoted_frames, axis=1, join="outer")
df.columns = df.columns.map("{0[1]}".format)
df.reset_index(inplace=True)

Expand Down

0 comments on commit 934d8cf

Please sign in to comment.