Source code for lightwood.data.timeseries_transform

import copy
import datetime
import dateutil
import numpy as np
import pandas as pd
import multiprocessing as mp
from lightwood.helpers.parallelism import get_nr_procs
from functools import partial
from typing import Dict
from lightwood.api.types import TimeseriesSettings
from lightwood.helpers.log import log
from lightwood.api import dtype


[docs]def transform_timeseries( data: pd.DataFrame, dtype_dict: Dict[str, str], timeseries_settings: TimeseriesSettings, target: str, mode: str) -> pd.DataFrame: """ Block that transforms the dataframe of a time series task to a convenient format for use in posterior phases like model training. The main transformations performed by this block are: - Type casting (e.g. to numerical for `order_by` columns). - Windowing functions for historical context based on `TimeseriesSettings.window` parameter. - Explicitly add target columns according to the `TimeseriesSettings.horizon` parameter. - Flag all rows that are "predictable" based on all `TimeseriesSettings`. - Plus, handle all logic for the streaming use case (where forecasts are only emitted for the last observed data point). :param data: Dataframe with data to transform. :param dtype_dict: Dictionary with the types of each column. :param timeseries_settings: A `TimeseriesSettings` object. :param target: The name of the target column to forecast. :param mode: Either "train" or "predict", depending on what phase is calling this procedure. :return: A dataframe with all the transformations applied. """ # noqa tss = timeseries_settings original_df = copy.deepcopy(data) gb_arr = tss.group_by if tss.group_by is not None else [] ob_arr = tss.order_by window = tss.window if tss.use_previous_target and target not in data.columns: raise Exception(f"Cannot transform. Missing historical values for target column {target} (`use_previous_target` is set to True).") # noqa for hcol in tss.historical_columns: if hcol not in data.columns or data[hcol].isna().any(): raise Exception(f"Cannot transform. Missing values in historical column {hcol}.") if '__mdb_make_predictions' in original_df.columns: index = original_df[original_df['__mdb_make_predictions'].map( {'True': True, 'False': False, True: True, False: False}).isin([True])] infer_mode = index.shape[0] == 0 # condition to trigger: __mdb_make_predictions is set to False everywhere # @TODO: dont drop and use instead of original_index? original_df = original_df.reset_index(drop=True) if infer_mode else original_df else: infer_mode = False original_index_list = [] idx = 0 for row in original_df.itertuples(): if _make_pred(row) or infer_mode: original_index_list.append(idx) idx += 1 else: original_index_list.append(None) original_df['original_index'] = original_index_list secondary_type_dict = {} for col in ob_arr: if dtype_dict[col] in (dtype.date, dtype.integer, dtype.float): secondary_type_dict[col] = dtype_dict[col] # Convert order_by columns to numbers (note, rows are references to mutable rows in `original_df`) for i, row in original_df.iterrows(): for col in ob_arr: # @TODO: Remove if the TS encoder can handle `None` if row[col] is None or pd.isna(row[col]): original_df.at[i, col] = 0.0 else: if dtype_dict[col] == dtype.date: try: row[col] = dateutil.parser.parse( row[col], **{} ) except (TypeError, ValueError): pass if isinstance(row[col], datetime.datetime): row[col] = row[col].timestamp() try: row[col] = float(row[col]) except ValueError: raise ValueError(f'Failed to order based on column: "{col}" due to faulty value: {row[col]}') for oby in tss.order_by: original_df[f'__mdb_original_{oby}'] = original_df[oby] group_lengths = [] if len(gb_arr) > 0: df_arr = [] for _, df in original_df.groupby(gb_arr): df_arr.append(df.sort_values(by=ob_arr)) group_lengths.append(len(df)) else: df_arr = [original_df.sort_values(by=ob_arr)] group_lengths.append(len(original_df)) n_groups = len(df_arr) last_index = original_df['original_index'].max() for i, subdf in enumerate(df_arr): if '__mdb_make_predictions' in subdf.columns and mode == 'predict': if infer_mode: df_arr[i] = _ts_infer_next_row(subdf, ob_arr, last_index) last_index += 1 if len(original_df) > 500: # @TODO: restore possibility to override this with args nr_procs = get_nr_procs(original_df) log.info(f'Using {nr_procs} processes to reshape.') pool = mp.Pool(processes=nr_procs) # Make type `object` so that dataframe cells can be python lists df_arr = pool.map(partial(_ts_to_obj, historical_columns=ob_arr + tss.historical_columns), df_arr) df_arr = pool.map(partial(_ts_order_col_to_cell_lists, order_cols=ob_arr + tss.historical_columns), df_arr) df_arr = pool.map( partial( _ts_add_previous_rows, order_cols=ob_arr + tss.historical_columns, window=window), df_arr) df_arr = pool.map(partial(_ts_add_future_target, target=target, horizon=tss.horizon, data_dtype=tss.target_type, mode=mode), df_arr) if tss.use_previous_target: df_arr = pool.map( partial(_ts_add_previous_target, target=target, window=tss.window), df_arr) pool.close() pool.join() else: for i in range(n_groups): df_arr[i] = _ts_to_obj(df_arr[i], historical_columns=ob_arr + tss.historical_columns) df_arr[i] = _ts_order_col_to_cell_lists(df_arr[i], order_cols=ob_arr + tss.historical_columns) df_arr[i] = _ts_add_previous_rows(df_arr[i], order_cols=ob_arr + tss.historical_columns, window=window) df_arr[i] = _ts_add_future_target(df_arr[i], target=target, horizon=tss.horizon, data_dtype=tss.target_type, mode=mode) if tss.use_previous_target: df_arr[i] = _ts_add_previous_target(df_arr[i], target=target, window=tss.window) combined_df = pd.concat(df_arr) if '__mdb_make_predictions' in combined_df.columns: combined_df = pd.DataFrame(combined_df[combined_df['__mdb_make_predictions'].astype(bool).isin([True])]) del combined_df['__mdb_make_predictions'] if not infer_mode and any([i < tss.window for i in group_lengths]): if tss.allow_incomplete_history: log.warning("Forecasting with incomplete historical context, predictions might be subpar") else: raise Exception(f'Not enough historical context to make a timeseries prediction (`allow_incomplete_history` is set to False). Please provide a number of rows greater or equal to the window size - currently (number_rows, window_size) = ({min(group_lengths)}, {tss.window}). If you can\'t get enough rows, consider lowering your window size. If you want to force timeseries predictions lacking historical context please set the `allow_incomplete_history` timeseries setting to `True`, but this might lead to subpar predictions depending on the mixer.') # noqa df_gb_map = None if n_groups > 1: df_gb_list = list(combined_df.groupby(tss.group_by)) df_gb_map = {} for gb, df in df_gb_list: df_gb_map['_' + '_'.join(str(gb))] = df timeseries_row_mapping = {} idx = 0 if df_gb_map is None: for _, row in combined_df.iterrows(): if not infer_mode: timeseries_row_mapping[idx] = int( row['original_index']) if row['original_index'] is not None and not np.isnan( row['original_index']) else None else: timeseries_row_mapping[idx] = idx idx += 1 else: for gb in df_gb_map: for _, row in df_gb_map[gb].iterrows(): if not infer_mode: timeseries_row_mapping[idx] = int( row['original_index']) if row['original_index'] is not None and not np.isnan( row['original_index']) else None else: timeseries_row_mapping[idx] = idx idx += 1 del combined_df['original_index'] return combined_df
def _ts_infer_next_row(df: pd.DataFrame, ob: str, last_index: int) -> pd.DataFrame: """ Adds an inferred next row for streaming mode purposes. :param df: dataframe from which next row is inferred. :param ob: `order_by` column. :param last_index: index number of the latest row in `df`. :return: Modified `df` with the inferred row appended to it. """ last_row = df.iloc[[-1]].copy() if df.shape[0] > 1: butlast_row = df.iloc[[-2]] delta = (last_row[ob].values - butlast_row[ob].values).flatten()[0] else: delta = 1 last_row.original_index = None last_row.index = [last_index + 1] last_row['__mdb_make_predictions'] = True last_row['__mdb_ts_inferred'] = True last_row[ob] += delta return df.append(last_row) def _make_pred(row) -> bool: """ Indicates whether a prediction should be made for `row` or not. """ return not hasattr(row, '__mdb_make_predictions') or row.make_predictions def _ts_to_obj(df: pd.DataFrame, historical_columns: list) -> pd.DataFrame: """ Casts all historical columns in a dataframe to `object` type. :param df: Input dataframe :param historical_columns: Historical columns to type cast :return: Dataframe with `object`-typed historical columns """ for hist_col in historical_columns: df.loc[:, hist_col] = df[hist_col].astype(object) return df def _ts_order_col_to_cell_lists(df: pd.DataFrame, order_cols: list) -> pd.DataFrame: """ Casts all data in `order_by` columns into cells. :param df: Input dataframe :param order_cols: `order_by` columns :return: Dataframe with all `order_cols` modified so that their values are cells, e.g. `1` -> `[1]` """ for order_col in order_cols: for ii in range(len(df)): label = df.index.values[ii] df.at[label, order_col] = [df.at[label, order_col]] return df def _ts_add_previous_rows(df: pd.DataFrame, order_cols: list, window: int) -> pd.DataFrame: """ Adds previous rows (as determined by `TimeseriesSettings.window`) into the cells of all `order_by` columns. :param df: Input dataframe. :param order_cols: `order_by` columns. :param window: value of `TimeseriesSettings.window` parameter. :return: Dataframe with all `order_cols` modified so that their values are now arrays of historical context. """ # noqa for order_col in order_cols: for i in range(len(df)): previous_indexes = [*range(max(0, i - window), i)] for prev_i in reversed(previous_indexes): df.iloc[i][order_col].append( df.iloc[prev_i][order_col][-1] ) # Zero pad # @TODO: Remove since RNN encoder can do without (???) df.iloc[i][order_col].extend( [0] * (1 + window - len(df.iloc[i][order_col])) ) df.iloc[i][order_col].reverse() return df def _ts_add_previous_target(df: pd.DataFrame, target: str, window: int) -> pd.DataFrame: """ Adds previous rows (as determined by `TimeseriesSettings.window`) into the cells of the target column. :param df: Input dataframe. :param target: target column name. :param window: value of `TimeseriesSettings.window` parameter. :return: Dataframe with new `__mdb_ts_previous_{target}` column that contains historical target context. """ # noqa if target not in df: return df previous_target_values = list(df[target]) del previous_target_values[-1] previous_target_values = [None] + previous_target_values previous_target_values_arr = [] for i in range(len(previous_target_values)): prev_vals = previous_target_values[max(i - window, 0):i + 1] arr = [None] * (window - len(prev_vals) + 1) arr.extend(prev_vals) previous_target_values_arr.append(arr) df[f'__mdb_ts_previous_{target}'] = previous_target_values_arr return df def _ts_add_future_target(df, target, horizon, data_dtype, mode): """ Adds as many columns to the input dataframe as the forecasting horizon asks for (as determined by `TimeseriesSettings.horizon`). :param df: Input dataframe. :param target: target column name. :param horizon: value of `TimeseriesSettings.horizon` parameter. :param data_dtype: dictionary with types of all input columns :param mode: either "train" or "predict". `Train` will drop rows with incomplet target info. `Predict` has no effect, for now. :return: Dataframe with new `{target}_timestep_{i}'` columns that contains target labels at timestep `i` of a total `TimeseriesSettings.horizon`. """ # noqa if target not in df: return df if data_dtype in (dtype.integer, dtype.float, dtype.num_array, dtype.num_tsarray): df[target] = df[target].astype(float) for timestep_index in range(1, horizon): next_target_value_arr = list(df[target]) for del_index in range(0, min(timestep_index, len(next_target_value_arr))): del next_target_value_arr[0] next_target_value_arr.append(None) col_name = f'{target}_timestep_{timestep_index}' df[col_name] = next_target_value_arr df[col_name] = df[col_name].fillna(value=np.nan) return df