Source code for lightwood.mixer.lightgbm_array

from copy import deepcopy
from typing import Dict, List, Union

import numpy as np
import pandas as pd

from lightwood.helpers.log import log
from lightwood.encoder.base import BaseEncoder
from lightwood.mixer.base import BaseMixer
from lightwood.mixer.lightgbm import LightGBM
from lightwood.api.types import PredictionArguments
from lightwood.data.encoded_ds import EncodedDs, ConcatedEncodedDs


[docs]class LightGBMArray(BaseMixer): """LightGBM-based model, intended for usage in time series tasks.""" models: List[LightGBM] horizon: int submodel_stop_after: float target: str supports_proba: bool def __init__( self, stop_after: float, target: str, dtype_dict: Dict[str, str], input_cols: List[str], horizon: int, fit_on_dev: bool, target_encoder: BaseEncoder): super().__init__(stop_after) self.submodel_stop_after = stop_after / horizon self.target = target # dtype_dict[target] = dtype.float @TODO: figure out if this can be removed self.models = [LightGBM(self.submodel_stop_after, target, dtype_dict, input_cols, fit_on_dev, False, target_encoder) for _ in range(horizon)] self.horizon = horizon # for time series tasks, how long is the forecast horizon self.supports_proba = False self.stable = True
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: log.info('Started fitting LGBM models for array prediction') original_target_train = deepcopy(train_data.data_frame[self.target]) original_target_dev = deepcopy(dev_data.data_frame[self.target]) for timestep in range(self.horizon): if timestep > 0: train_data.data_frame[self.target] = train_data.data_frame[f'{self.target}_timestep_{timestep}'] dev_data.data_frame[self.target] = dev_data.data_frame[f'{self.target}_timestep_{timestep}'] self.models[timestep].fit(train_data, dev_data) # @TODO: this call could be parallelized # restore target train_data.data_frame[self.target] = original_target_train dev_data.data_frame[self.target] = original_target_dev
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None: log.info('Updating array of LGBM models...') original_target_train = deepcopy(train_data.data_frame[self.target]) original_target_dev = deepcopy(dev_data.data_frame[self.target]) for timestep in range(self.horizon): if timestep > 0: train_data.data_frame[self.target] = train_data.data_frame[f'{self.target}_timestep_{timestep}'] dev_data.data_frame[self.target] = dev_data.data_frame[f'{self.target}_timestep_{timestep}'] self.models[timestep].partial_fit(train_data, dev_data) # @TODO: this call could be parallelized # restore target train_data.data_frame[self.target] = original_target_train dev_data.data_frame[self.target] = original_target_dev
def __call__(self, ds: Union[EncodedDs, ConcatedEncodedDs], args: PredictionArguments = PredictionArguments()) -> pd.DataFrame: if args.predict_proba: log.warning('This model does not output probability estimates') length = sum(ds.encoded_ds_lenghts) if isinstance(ds, ConcatedEncodedDs) else len(ds) ydf = pd.DataFrame(0, # zero-filled index=np.arange(length), columns=[f'prediction_{i}' for i in range(self.horizon)]) for timestep in range(self.horizon): ydf[f'prediction_{timestep}'] = self.models[timestep](ds, args)['prediction'] ydf['prediction'] = ydf.values.tolist() return ydf[['prediction']]