from typing import Dict, Tuple, List
import numpy as np
import pandas as pd
from lightwood.api.types import TimeseriesSettings
from lightwood.api.dtype import dtype
from lightwood.encoder.time_series.helpers.common import generate_target_group_normalizers
from lightwood.helpers.general import get_group_matches
[docs]def timeseries_analyzer(data: pd.DataFrame, dtype_dict: Dict[str, str],
timeseries_settings: TimeseriesSettings, target: str) -> Dict:
"""
This module analyzes (pre-processed) time series data and stores a few useful insights used in the rest of Lightwood's pipeline.
:param data: dataframe with time series dataset.
:param dtype_dict: dictionary with inferred types for every column.
:param timeseries_settings: A `TimeseriesSettings` object. For more details, check `lightwood.types.TimeseriesSettings`.
:param target: name of the target column.
The following things are extracted from each time series inside the dataset:
- group_combinations: all observed combinations of values for the set of `group_by` columns. The length of this list determines how many time series are in the data.
- deltas: inferred sampling interval
- ts_naive_residuals: Residuals obtained from the data by a naive forecaster that repeats the last-seen value.
- ts_naive_mae: Mean residual value obtained from the data by a naive forecaster that repeats the last-seen value.
- target_normalizers: objects that may normalize the data within any given time series for effective learning. See `lightwood.encoder.time_series.helpers.common` for available choices.
:return: Dictionary with the aforementioned insights and the `TimeseriesSettings` object for future references.
""" # noqa
tss = timeseries_settings
info = {
'original_type': dtype_dict[target],
'data': data[target].values
}
if tss.group_by is not None:
info['group_info'] = {gcol: data[gcol] for gcol in tss.group_by} # group col values
else:
info['group_info'] = {}
# @TODO: maybe normalizers should fit using only the training subsets??
new_data = generate_target_group_normalizers(info)
if dtype_dict[target] in (dtype.integer, dtype.float, dtype.num_tsarray):
naive_forecast_residuals, scale_factor = get_grouped_naive_residuals(info, new_data['group_combinations'])
else:
naive_forecast_residuals, scale_factor = {}, {}
deltas = get_delta(data[tss.order_by],
info,
new_data['group_combinations'],
tss.order_by)
# detect period
periods, freqs = detect_period(deltas, tss)
return {'target_normalizers': new_data['target_normalizers'],
'deltas': deltas,
'tss': tss,
'group_combinations': new_data['group_combinations'],
'ts_naive_residuals': naive_forecast_residuals,
'ts_naive_mae': scale_factor,
'periods': periods,
'sample_freqs': freqs
}
def get_delta(df: pd.DataFrame, ts_info: dict, group_combinations: list, order_cols: list) -> Dict[str, Dict]:
"""
Infer the sampling interval of each time series, by picking the most popular time interval observed in the training data.
:param df: Dataframe with time series data.
:param ts_info: Dictionary used internally by `timeseries_analyzer`. Contains group-wise series information, among other things.
:param group_combinations: all tuples with distinct values for `TimeseriesSettings.group_by` columns, defining all available time series.
:param order_cols: all columns specified in `TimeseriesSettings.order_by`.
:return:
Dictionary with group combination tuples as keys. Values are dictionaries with the inferred delta for each series, for each `order_col`.
""" # noqa
def _most_popular(sorted_keys: pd.Index):
idx = 0
while sorted_keys[idx] == 0:
if idx == len(sorted_keys) - 1:
break
else:
idx += 1
return sorted_keys[idx]
deltas = {"__default": {}}
# get default delta for all data
for col in order_cols:
series = pd.Series([x[-1] for x in df[col]])
series = series.drop_duplicates() # by this point df is ordered so duplicate timestamps are either because of non-handled groups or repeated data that, for mode delta estimation, should be ignored # noqa
rolling_diff = series.rolling(window=2).apply(lambda x: x.iloc[1] - x.iloc[0])
delta = _most_popular(rolling_diff.value_counts(ascending=False).keys()) # pick most popular
deltas["__default"][col] = delta
# get group-wise deltas (if applicable)
if ts_info.get('group_info', False):
original_data = ts_info['data']
for group in group_combinations:
if group != "__default":
for col in order_cols:
ts_info['data'] = pd.Series([x[-1] for x in df[col]])
_, subset = get_group_matches(ts_info, group)
if subset.size > 1:
rolling_diff = pd.Series(
subset.squeeze()).rolling(
window=2).apply(
lambda x: x.iloc[1] - x.iloc[0])
delta = _most_popular(rolling_diff.value_counts(ascending=False).keys())
if group in deltas:
deltas[group][col] = delta
else:
deltas[group] = {col: delta}
ts_info['data'] = original_data
return deltas
def get_naive_residuals(target_data: pd.DataFrame, m: int = 1) -> Tuple[List, float]:
"""
Computes forecasting residuals for the naive method (forecasts for time `t` is the value observed at `t-1`).
Useful for computing MASE forecasting error.
Note: method assumes predictions are all for the same group combination. For a dataframe that contains multiple
series, use `get_grouped_naive_resiudals`.
:param target_data: observed time series targets
:param m: season length. the naive forecasts will be the m-th previously seen value for each series
:return: (list of naive residuals, average residual value)
""" # noqa
# @TODO: support categorical series as well
residuals = target_data.rolling(window=m + 1).apply(lambda x: abs(x.iloc[m] - x.iloc[0]))[m:].values.flatten()
scale_factor = np.average(residuals)
return residuals.tolist(), scale_factor
def get_grouped_naive_residuals(info: Dict, group_combinations: List) -> Tuple[Dict, Dict]:
"""
Wraps `get_naive_residuals` for a dataframe with multiple co-existing time series.
""" # noqa
group_residuals = {}
group_scale_factors = {}
for group in group_combinations:
idxs, subset = get_group_matches(info, group)
residuals, scale_factor = get_naive_residuals(pd.DataFrame(subset)) # @TODO: pass m once we handle seasonality
group_residuals[group] = residuals
group_scale_factors[group] = scale_factor
return group_residuals, group_scale_factors
def detect_period(deltas: dict, tss: TimeseriesSettings) -> (Dict[str, float], Dict[str, str]):
"""
Helper method that, based on the most popular interval for a time series, determines its seasonal peridiocity (sp).
This bit of information can be crucial for good modelling with methods like ARIMA.
Supported time intervals are:
* 'year'
* 'semestral'
* 'quarter'
* 'bimonthly'
* 'monthly'
* 'weekly'
* 'daily'
* 'hourly'
* 'minute'
* 'second'
Note: all computations assume that the first provided `order_by` column is the one that specifies the sp.
:param deltas: output of `get_delta`, has the most popular interval for each time series.
:param tss: timeseries settings.
:return: for all time series 1) a dictionary with its sp and 2) a dictionary with the detected sampling frequency
""" # noqa
interval_to_period = {interval: period for (interval, period) in tss.interval_periods}
secs_to_interval = {
'year': 60 * 60 * 24 * 365,
'semestral': 60 * 60 * 24 * 365 // 2,
'quarter': 60 * 60 * 24 * 365 // 4,
'bimonthly': 60 * 60 * 24 * 365 // 6,
'monthly': 60 * 60 * 24 * 31,
'weekly': 60 * 60 * 24 * 7,
'daily': 60 * 60 * 24,
'hourly': 60 * 60,
'minute': 60,
'second': 1
}
for tag, period in (('year', 1), ('semestral', 2), ('quarter', 4), ('bimonthly', 6), ('monthly', 12),
('weekly', 4), ('daily', 1), ('hourly', 24), ('minute', 1), ('second', 1)):
if tag not in interval_to_period.keys():
interval_to_period[tag] = period
periods = {}
freqs = {}
order_col_idx = 0
for group in deltas.keys():
delta = deltas[group][tss.order_by[order_col_idx]]
diffs = [(tag, abs(delta - secs)) for tag, secs in secs_to_interval.items()]
min_tag, min_diff = sorted(diffs, key=lambda x: x[1])[0]
periods[group] = interval_to_period.get(min_tag, 1)
freqs[group] = min_tag
return periods, freqs