Source code for lightwood.mixer.sktime

import inspect
import importlib
from copy import deepcopy
from datetime import datetime
from itertools import product
from typing import Dict, Union

import optuna
import numpy as np
import pandas as pd
from sktime.forecasting.compose import TransformedTargetForecaster
from sktime.forecasting.base import ForecastingHorizon, BaseForecaster
from sktime.performance_metrics.forecasting import MeanAbsolutePercentageError
from sktime.forecasting.statsforecast import StatsForecastAutoARIMA as AutoARIMA

from lightwood.helpers.log import log
from lightwood.mixer.base import BaseMixer
from lightwood.api.types import PredictionArguments
from lightwood.helpers.ts import get_group_matches
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs


[docs]class SkTime(BaseMixer): forecaster: str horizon: int target: str supports_proba: bool model_path: str hyperparam_search: bool def __init__( self, stop_after: float, target: str, dtype_dict: Dict[str, str], horizon: int, ts_analysis: Dict, model_path: str = None, auto_size: bool = True, sp: int = None, hyperparam_search: bool = True, use_stl: bool = False ): """ This mixer is a wrapper around the popular time series library sktime. It exhibits different behavior compared to other forecasting mixers, as it predicts based on indices in a forecasting horizon that is defined with respect to the last seen data point at training time. Due to this, the mixer tries to "fit_on_all" so that the latest point in the validation split marks the difference between training data and where forecasts will start. In practice, you need to specify how much time has passed since the aforementioned timestamp for correct forecasts. By default, it is assumed that predictions are for the very next timestamp post-training. If the task has groups (i.e. 'TimeseriesSettings.group_by' is not empty), the mixer will spawn one forecaster object per each different group observed at training time, plus an additional default forecaster fit with all data. There is an optuna-based automatic hyperparameter search. For now, it considers selecting the forecaster type based on the global SMAPE error across all groups. :param stop_after: time budget in seconds. :param target: column to forecast. :param dtype_dict: dtypes of all columns in the data. :param horizon: length of forecasted horizon. :param sp: seasonality period to enforce (instead of automatic inference done at the `ts_analysis` module) :param ts_analysis: dictionary with miscellaneous time series info, as generated by 'lightwood.data.timeseries_analyzer'. :param model_path: sktime forecaster to use as underlying model(s). Should be a string with format "$module.$class' where '$module' is inside `sktime.forecasting`. Default is 'arima.AutoARIMA'. :param hyperparam_search: bool that indicates whether to perform the hyperparameter tuning or not. :param auto_size: whether to filter out old data points if training split is bigger than a certain threshold (defined by the dataset sampling frequency). Enabled by default to avoid long training times in big datasets. :param use_stl: Whether to use de-trenders and de-seasonalizers fitted in the timeseries analysis phase. """ # noqa super().__init__(stop_after) self.stable = False self.prepared = False self.supports_proba = False self.target = target self.name = 'AutoSKTime' default_possible_models = [ 'croston.Croston', 'theta.ThetaForecaster', 'trend.STLForecaster', 'trend.PolynomialTrendForecaster', ] self.dtype_dict = dtype_dict self.ts_analysis = ts_analysis self.horizon = horizon self.sp = sp self.grouped_by = ['__default'] if not ts_analysis['tss'].group_by else ts_analysis['tss'].group_by self.auto_size = auto_size self.cutoff_factor = 4 # times the detected maximum seasonal period self.use_stl = use_stl # optuna hyperparameter tuning self.models = {} self.cutoffs = {} # last seen timestamp per each model self.study = None self.hyperparam_dict = {} self.model_path = model_path if model_path else 'trend.STLForecaster' self.hyperparam_search = hyperparam_search self.trial_error_fn = MeanAbsolutePercentageError(symmetric=True) self.possible_models = default_possible_models if not model_path else [model_path] self.n_trials = len(self.possible_models) self.freq = self._get_freq(self.ts_analysis['deltas']['__default']) # sktime forecast horizon object is made relative to the end of the latest data point seen at training time # the default assumption is to forecast the next `self.horizon` after said data point self.fh = ForecastingHorizon(np.arange(1, self.horizon + 1), is_relative=True)
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: """ Fits a set of sktime forecasters. The number of models depends on how many groups are observed at training time. Forecaster type can be specified by providing the `model_class` argument in `__init__()`. It can also be determined by hyperparameter optimization based on dev data validation error. """ # noqa log.info(f'Started fitting {self.name} forecaster for array prediction') if self.hyperparam_search: search_space = {'class': self.possible_models} self.study = optuna.create_study(direction='minimize', sampler=optuna.samplers.GridSampler(search_space)) self.study.optimize(lambda trial: self._get_best_model(trial, train_data, dev_data), n_trials=self.n_trials) data = ConcatedEncodedDs([train_data, dev_data]) self._fit(data) else: data = ConcatedEncodedDs([train_data, dev_data]) self._fit(data)
def _fit(self, data): """ Internal method that fits forecasters to a given dataframe. """ df = data.data_frame.sort_values(by=f'__mdb_original_{self.ts_analysis["tss"].order_by}') if not self.hyperparam_search and not self.study: module_name = self.model_path else: finished_study = sum([int(trial.state.is_finished()) for trial in self.study.trials]) == self.n_trials if finished_study: log.info(f'Selected best model: {self.study.best_params["class"]}') module_name = self.study.best_params['class'] else: module_name = self.hyperparam_dict['class'] # select active optuna choice sktime_module = importlib.import_module('.'.join(['sktime', 'forecasting', module_name.split(".")[0]])) try: model_class = getattr(sktime_module, module_name.split(".")[1]) except AttributeError: model_class = AutoARIMA # use AutoARIMA when the provided class does not exist for group in self.ts_analysis['group_combinations']: kwargs = {} sp = self.sp if self.sp else self.ts_analysis['periods'].get(group, '__default')[0] options = { 'sp': sp, # seasonality period 'suppress_warnings': True, # ignore warnings if possible 'error_action': 'raise', # avoids fit() failing silently } if self.model_path == 'fbprophet.Prophet': options['freq'] = self.freq for k, v in options.items(): kwargs = self._add_forecaster_kwarg(model_class, kwargs, k, v) model_pipeline = [("forecaster", model_class(**kwargs))] if self.use_stl and self.ts_analysis['stl_transforms'].get(group, False): model_pipeline.insert(0, ("detrender", self.ts_analysis['stl_transforms'][group]["transformer"].detrender)) model_pipeline.insert(0, ("deseasonalizer", self.ts_analysis['stl_transforms'][group]["transformer"].deseasonalizer)) self.models[group] = TransformedTargetForecaster(model_pipeline) oby_col = self.ts_analysis['tss'].order_by if self.grouped_by == ['__default']: series_data = df series_oby = df[oby_col] self.cutoffs[group] = series_oby.index[-1] # defines the 'present' time for each partition else: series_idxs, series_data = get_group_matches(df, group, self.grouped_by) series_oby = series_data[oby_col] self.cutoffs[group] = series_idxs[-1] # defines the 'present' time for each partition series = series_data[self.target] if series_data.size > self.ts_analysis['tss'].window: series = series.sort_index(ascending=True) series = series.reset_index(drop=True) series = series.loc[~pd.isnull(series.values)] # remove NaN # @TODO: benchmark imputation vs this? if self.model_path == 'fbprophet.Prophet': series = self._transform_index_to_datetime(series, series_oby, options['freq']) series = series.astype(float) # if data is huge, filter out old records for quicker fitting if self.auto_size: cutoff = min(len(series), max(500, options['sp'] * self.cutoff_factor)) series = series.iloc[-cutoff:] try: self.models[group].fit(series, fh=self.fh) except Exception: self.models[group] = model_class() # with default options (i.e. no seasonality, among others) self.models[group].fit(series, fh=self.fh)
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: """ Note: sktime asks for "specification of the time points for which forecasts are requested", and this mixer complies by assuming forecasts will start immediately after the last observed value. Because of this, `ProblemDefinition.fit_on_all` is set to True so that `partial_fit` uses both `dev` and `test` splits to fit the models. Due to how lightwood implements the `update` procedure, expected inputs for this method are: :param dev_data: original `test` split (used to validate and select model if ensemble is `BestOf`). :param train_data: concatenated original `train` and `dev` splits. """ # noqa self.hyperparam_search = False self.fit(dev_data, train_data) self.prepared = True
def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs], args: PredictionArguments = PredictionArguments()) -> pd.DataFrame: """ Calls the mixer to emit forecasts. If there are groups that were not observed at training, a default forecaster (trained on all available data) is used, warning the user that performance might not be optimal. Latest data point in `train_data` passed to `fit()` determines the starting point for predictions. Relative offsets will be automatically determined when predicting for other starting points. """ # noqa if args.predict_proba: log.warning('This mixer does not output probability estimates') df = deepcopy(ds.data_frame) df = df.rename_axis('__sktime_index').reset_index() length = sum(ds.encoded_ds_lenghts) if isinstance(ds, ConcatedEncodedDs) else len(ds) ydf = pd.DataFrame(0, # zero-filled index=df.index, columns=['prediction'], dtype=object) group_values = {gcol: df[gcol].tolist() for gcol in self.grouped_by} \ if self.ts_analysis['tss'].group_by \ else {'': ['__default' for _ in range(length)]} pending_idxs = set(df.index) all_group_combinations = list(product(*[set(x) for x in group_values.values()])) for group in all_group_combinations: group = tuple(group) group = '__default' if group[0] == '__default' else group series_idxs, series_data = get_group_matches(df, group, self.grouped_by) if series_data.size > 0: start_ts = series_data['__sktime_index'].iloc[0] series = series_data[self.target] series_idxs = sorted(series_idxs) if self.models.get(group, False) and self.models[group].is_fitted: freq = self.ts_analysis['deltas'][group] delta = (start_ts - self.cutoffs[group]).total_seconds() offset = round(delta / freq) forecaster = self.models[group] ydf = self._call_groupmodel(ydf, forecaster, series, offset=offset) log.debug(f'[SkTime] Forecasting for group {group}, start at {start_ts} (offset by {offset} for cutoff at {self.cutoffs[group]} (relative {self.models[group].cutoff}))') # noqa else: log.warning(f"Applying naive forecaster for novel group {group}. Performance might not be optimal.") ydf = self._call_default(ydf, series.values, series_idxs) pending_idxs -= set(series_idxs) # apply default model in all remaining novel-group rows if len(pending_idxs) > 0: series = df[self.target][list(pending_idxs)].squeeze() ydf = self._call_default(ydf, series, list(pending_idxs)) return ydf[['prediction']] def _call_groupmodel(self, ydf: pd.DataFrame, model: BaseForecaster, series: pd.Series, offset: int = 0): """ Inner method that calls a `sktime.BaseForecaster`. :param offset: indicates relative offset to the latest data point seen during model training. Cannot be less than the number of training data points + the amount of diffences applied internally by the model. """ # noqa if isinstance(model, TransformedTargetForecaster): submodel = model.steps_[-1][-1] else: submodel = model if hasattr(submodel, '_cutoff') and hasattr(submodel, 'd'): model_d = 0 if submodel.d is None else submodel.d cutoff = submodel._cutoff.values[0] if isinstance(submodel._cutoff, pd.Int64Index) else submodel._cutoff min_offset = -cutoff + model_d + 1 else: min_offset = -np.inf start = max(offset, min_offset) end = series.shape[0] + offset + self.horizon all_preds = model.predict(np.arange(start, end)).tolist() for true_idx, (idx, _) in enumerate(series.items()): start_idx = 0 if max(1 + true_idx + offset, min_offset) < 0 else true_idx end_idx = start_idx + self.horizon ydf['prediction'].loc[idx] = all_preds[start_idx:end_idx] return ydf def _call_default(self, ydf, data, idxs): # last value from each window equals shifted target (by 1) # noqa series = np.array([0] + list(data.flatten())[:-1]) all_preds = [[value for _ in range(self.horizon)] for value in series] ydf['prediction'].iloc[idxs] = all_preds return ydf def _get_best_model(self, trial, train_data, test_data): """ Helper function for Optuna hyperparameter optimization. For now, it uses dev data split to find the best model out of the list specified in self.possible_models. """ self.hyperparam_dict = { 'class': trial.suggest_categorical('class', self.possible_models) } log.info(f'Starting trial with hyperparameters: {self.hyperparam_dict}') try: self._fit(train_data) y_true = test_data.data_frame[self.target].values[:self.horizon] y_pred = pd.DataFrame(self(test_data)['prediction'].iloc[0][:len(y_true)]) error = self.trial_error_fn(y_true, y_pred) except Exception as e: log.debug(e) error = np.inf log.info(f'Trial got error: {error}') return error def _add_forecaster_kwarg(self, forecaster: BaseForecaster, kwargs: dict, key: str, value): """ Adds arguments to the `kwargs` dictionary if the key-value pair is valid for the `forecaster` class signature. """ if key in [p.name for p in inspect.signature(forecaster).parameters.values()]: kwargs[key] = value return kwargs def _transform_index_to_datetime(self, series, series_oby, freq): series_oby = np.array([np.array(lst) for lst in series_oby]) start = datetime.utcfromtimestamp(np.min(series_oby[series_oby != np.min(series_oby)])) series.index = pd.date_range(start=start, freq=freq, normalize=False, periods=series.shape[0]) return series def _get_freq(self, delta): labels = ['S', 'T', 'H', 'D', 'W', 'M', 'Q', 'Y'] secs = [1, 60, 60 * 60, 60 * 60 * 24, 60 * 60 * 24 * 7, 60 * 60 * 24 * 7 * 4, 60 * 60 * 24 * 7 * 4 * 3, 60 * 60 * 24 * 7 * 4 * 12] min_diff = np.argmin(np.abs(np.array(secs) - delta)) return labels[min_diff]