from typing import Dict, List, Optional, Union
import time
import torch
import optuna
import xgboost as xgb
import numpy as np
import pandas as pd
from sklearn.preprocessing import OrdinalEncoder
from type_infer.dtype import dtype
from lightwood.helpers.log import log
from lightwood.helpers.parallelism import get_nr_procs
from lightwood.mixer.base import BaseMixer
from lightwood.encoder.base import BaseEncoder
from lightwood.api.types import PredictionArguments
from lightwood.data.encoded_ds import EncodedDs
optuna.logging.set_verbosity(optuna.logging.CRITICAL)
def check_gpu_support():
try:
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
data = load_iris()
x, _, y, _ = train_test_split(data['data'], data['target'], test_size=.2)
bst = xgb.XGBClassifier(n_estimators=2, max_depth=2, learning_rate=1, objective='binary:logistic',
tree_method='gpu_hist', gpu_id=0)
bst.fit(x, y)
return True
except xgb.core.XGBoostError:
return False
[docs]class XGBoostMixer(BaseMixer):
model: Union[xgb.XGBClassifier, xgb.XGBRegressor]
ordinal_encoder: OrdinalEncoder
label_set: List[str]
max_bin: int
device: torch.device
device_str: str
num_iterations: int
use_optuna: bool
supports_proba: bool
"""
Gradient boosting mixer with an XGBoost backbone.
This mixer is a good all-rounder, due to the generally great performance of tree-based ML algorithms for supervised learning tasks with tabular data.
If you want more information regarding the techniques that set apart XGBoost from other gradient boosters, please refer to their technical paper: "XGBoost: A Scalable Tree Boosting System" (2016).
We can basically think of this mixer as a wrapper to the XGBoost Python package. To do so, there are a few caveats the user may want to be aware about:
* If you seek GPU utilization, XGBoost must be compiled from source instead of being installed through `pip`.
* Integer, float, and quantity `dtype`s are treated as regression tasks with `reg:squarederror` loss. All other supported `dtype`s is casted as a multiclass task with `multi:softmax` loss.
* A partial fit can be performed with the `dev` data split as part of `fit`, if specified with the `fit_on_dev` argument.
There are a couple things in the backlog that will hopefully be added soon:
* An automatic optuna-based hyperparameter search. This procedure triggers when a single iteration of XGBoost is deemed fast enough (given the time budget).
* Support for "unknown class" as a possible answer for multiclass tasks.
""" # noqa
def __init__(
self, stop_after: float, target: str, dtype_dict: Dict[str, str],
input_cols: List[str],
fit_on_dev: bool, use_optuna: bool, target_encoder: BaseEncoder):
"""
:param stop_after: time budget in seconds.
:param target: name of the target column that the mixer will learn to predict.
:param dtype_dict: dictionary with dtypes of all columns in the data.
:param input_cols: list of column names.
:param fit_on_dev: whether to perform a `partial_fit()` at the end of `fit()` using the `dev` data split.
:param use_optuna: whether to activate the automated hyperparameter search (optuna-based). Note that setting this flag to `True` does not guarantee the search will run, rather, the speed criteria will be checked first (i.e., if a single iteration is too slow with respect to the time budget, the search will not take place).
""" # noqa
super().__init__(stop_after)
self.model = None
self.ordinal_encoder = None
self.positive_domain = False
self.label_set = []
self.target = target
self.dtype_dict = dtype_dict
self.input_cols = input_cols
self.use_optuna = use_optuna
self.params = {}
self.fit_on_dev = fit_on_dev
self.cls_dtypes = [dtype.categorical, dtype.binary] # , dtype.cat_tsarray] # TODO
self.float_dtypes = [dtype.float, dtype.quantity] # , dtype.num_tsarray] # TODO
self.num_dtypes = [dtype.integer] + self.float_dtypes
self.supports_proba = dtype_dict[target] in self.cls_dtypes
self.stable = True
self.target_encoder = target_encoder
gpu_works = check_gpu_support()
if not gpu_works:
self.device = torch.device('cpu')
self.device_str = 'cpu'
log.warning('XGBoost running on CPU')
else:
self.device = torch.device('cuda')
self.device_str = 'gpu'
self.max_bin = 255
def _to_dataset(self, ds: EncodedDs, output_dtype: str, mode='train'):
"""
Helper method to wrangle a datasource into the format that the underlying model requires.
:param ds: EncodedDS that contains the dataframe to transform.
:param output_dtype:
:return: modified `data` object that conforms to XGBoost's expected format.
""" # noqa
data = None
for input_col in self.input_cols:
if data is None:
data = ds.get_encoded_column_data(input_col).to(self.device)
else:
enc_col = ds.get_encoded_column_data(input_col).to(self.device)
data = torch.cat((data, enc_col.to(self.device)), 1)
data = data.cpu().numpy()
if mode in ('train', 'dev'):
label_data = ds.get_column_original_data(self.target)
if output_dtype in self.cls_dtypes:
if mode == 'train': # TODO weight maps?
self.ordinal_encoder = OrdinalEncoder()
self.label_set = list(set(label_data))
self.ordinal_encoder.fit(np.array(list(self.label_set)).reshape(-1, 1))
filtered_label_data = []
for x in label_data:
if x in self.label_set:
filtered_label_data.append(x)
label_data = self.ordinal_encoder.transform(np.array(filtered_label_data).reshape(-1, 1)).flatten()
elif output_dtype == dtype.integer:
label_data = label_data.clip(-pow(2, 63), pow(2, 63)).astype(int)
elif output_dtype in self.float_dtypes:
label_data = label_data.astype(float)
return data, label_data
else:
return data
[docs] def fit(self, train_data: EncodedDs, dev_data: EncodedDs) -> None:
"""
Fits the XGBoost model.
:param train_data: encoded features for training dataset
:param dev_data: encoded features for dev dataset
"""
started = time.time()
log.info('Started fitting XGBoost model')
self.fit_data_len = len(train_data)
self.positive_domain = getattr(train_data.encoders.get(self.target, None), 'positive_domain', False)
output_dtype = self.dtype_dict[self.target]
if output_dtype not in self.cls_dtypes + self.num_dtypes:
log.error(f'XGBoost mixer not supported for type: {output_dtype}')
raise Exception(f'XGBoost mixer not supported for type: {output_dtype}')
else:
objective = 'reg:squarederror' if output_dtype in self.num_dtypes else 'multi:softprob'
metric = 'rmse' if output_dtype in self.num_dtypes else 'mlogloss'
self.params = {
'objective': objective,
'eval_metric': metric,
'n_jobs': get_nr_procs(train_data.data_frame),
'process_type': 'default', # normal training
'verbosity': 0,
'early_stopping_rounds': 5
# 'device_type': self.device_str, # TODO
}
# Prepare the data
train_dataset, train_labels = self._to_dataset(train_data, output_dtype, mode='train')
dev_dataset, dev_labels = self._to_dataset(dev_data, output_dtype, mode='dev')
if output_dtype not in self.num_dtypes:
self.all_classes = self.ordinal_encoder.categories_[0]
self.params['num_class'] = self.all_classes.size
model_class = xgb.XGBClassifier
else:
model_class = xgb.XGBRegressor
# Determine time per iterations
start = time.time()
self.params['n_estimators'] = 1
with xgb.config_context(verbosity=0):
self.model = model_class(**self.params)
self.model.fit(train_dataset, train_labels, eval_set=[(dev_dataset, dev_labels)])
end = time.time()
seconds_for_one_iteration = max(0.1, end - start)
self.stop_after = max(1, self.stop_after - (time.time() - started))
# Determine nr of iterations
log.info(f'A single GBM iteration takes {seconds_for_one_iteration} seconds')
self.num_iterations = int(self.stop_after * 0.8 / seconds_for_one_iteration)
# TODO: Turn on grid search if training doesn't take too long using it
# kwargs = {}
# if self.use_optuna and self.num_iterations >= 200:
# model_generator = None
# kwargs['time_budget'] = self.stop_after * 0.4
# self.num_iterations = int(self.num_iterations / 2)
# kwargs['optuna_seed'] = 0
# else:
# model_generator = None
# Train the models
log.info(
f'Training XGBoost with {self.num_iterations} iterations given {self.stop_after} seconds constraint') # noqa
if self.num_iterations < 1:
self.num_iterations = 1
self.params['n_estimators'] = int(self.num_iterations)
# TODO: reinstance these on each run! keep or use via eval_set in fit()?
# self.params['callbacks'] = [(train_dataset, 'train'), (dev_dataset, 'eval')]
with xgb.config_context(verbosity=0):
self.model = model_class(**self.params)
self.model.fit(train_dataset, train_labels, eval_set=[(dev_dataset, dev_labels)])
if self.fit_on_dev:
self.partial_fit(dev_data, train_data)
[docs] def partial_fit(self, train_data: EncodedDs, dev_data: EncodedDs, args: Optional[dict] = None) -> None:
# TODO: https://xgboost.readthedocs.io/en/stable/python/python_api.html#module-xgboost.sklearn
# To resume training from a previous checkpoint, explicitly pass xgb_model argument.
log.info('XGBoost mixer does not have a `partial_fit` implementation')
def __call__(self, ds: EncodedDs,
args: PredictionArguments = PredictionArguments()) -> pd.DataFrame:
"""
Call a trained XGBoost mixer to output predictions for the target column.
:param ds: input data with values for all non-target columns.
:param args: inference-time arguments (e.g. whether to output predicted labels or probabilities).
:return: dataframe with predictions.
""" # noqa
output_dtype = self.dtype_dict[self.target]
xgbdata = self._to_dataset(ds, self.dtype_dict[self.target], mode='predict')
with xgb.config_context(verbosity=0):
if output_dtype in self.num_dtypes:
raw_predictions = self.model.predict(xgbdata)
else:
raw_predictions = self.model.predict_proba(xgbdata)
if self.ordinal_encoder is not None:
decoded_predictions = self.ordinal_encoder.inverse_transform(
np.argmax(raw_predictions, axis=1).reshape(-1, 1)).flatten()
else:
decoded_predictions = raw_predictions
if self.positive_domain:
decoded_predictions = [max(0, p) for p in decoded_predictions]
ydf = pd.DataFrame({'prediction': decoded_predictions})
if args.predict_proba and self.ordinal_encoder is not None:
for idx, label in enumerate(self.ordinal_encoder.categories_[0].tolist()):
ydf[f'__mdb_proba_{label}'] = raw_predictions[:, idx]
return ydf