from copy import deepcopy
from itertools import product
from typing import Dict, Tuple
from types import SimpleNamespace
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from lightwood.api.dtype import dtype
from lightwood.api.types import PredictionArguments
from lightwood.helpers.ts import add_tn_conf_bounds
from lightwood.data import EncodedDs
from lightwood.analysis.base import BaseAnalysisBlock
from lightwood.analysis.nc.norm import Normalizer
from lightwood.analysis.nc.icp import IcpRegressor, IcpClassifier
from lightwood.analysis.nc.base import CachedRegressorAdapter, CachedClassifierAdapter
from lightwood.analysis.nc.nc import BoostedAbsErrorErrFunc, RegressorNc, ClassifierNc, MarginErrFunc
from lightwood.analysis.nc.util import clean_df, set_conf_range, get_numeric_conf_range, \
get_categorical_conf, get_anomalies
[docs]class ICP(BaseAnalysisBlock):
""" Confidence estimation block, uses inductive conformal predictors (ICPs) for model agnosticity """
def __init__(self,
fixed_significance: float,
positive_domain: bool,
confidence_normalizer: bool
):
super().__init__()
self.fixed_significance = fixed_significance
self.positive_domain = positive_domain
self.confidence_normalizer = confidence_normalizer
self.validation_size = 100 # determines size of nonconformity score arrays (has sizable impact in runtime)
[docs] def analyze(self, info: Dict[str, object], **kwargs) -> Dict[str, object]:
ns = SimpleNamespace(**kwargs)
data_type = ns.dtype_dict[ns.target]
output = {'icp': {'__mdb_active': False}}
fit_params = {'horizon': ns.tss.horizon or 0, 'columns_to_ignore': []}
fit_params['columns_to_ignore'].extend([f'timestep_{i}' for i in range(1, fit_params['horizon'])])
if ns.is_classification:
if ns.predictor.supports_proba:
all_cat_cols = [col for col in ns.normal_predictions.columns
if '__mdb_proba' in col and '__mdb_unknown_cat' not in col]
all_classes = np.array([col.replace('__mdb_proba_', '') for col in all_cat_cols])
else:
class_keys = sorted(ns.encoded_val_data.encoders[ns.target].rev_map.keys())
all_classes = np.array([ns.encoded_val_data.encoders[ns.target].rev_map[idx] for idx in class_keys])
if data_type != dtype.tags:
enc = OneHotEncoder(sparse=False, handle_unknown='ignore')
enc.fit(all_classes.reshape(-1, 1))
output['label_encoders'] = enc # needed to repr cat labels inside nonconformist
else:
output['label_encoders'] = None
adapter = CachedClassifierAdapter
nc_function = MarginErrFunc()
nc_class = ClassifierNc
icp_class = IcpClassifier
else:
adapter = CachedRegressorAdapter
nc_function = BoostedAbsErrorErrFunc()
nc_class = RegressorNc
icp_class = IcpRegressor
result_df = pd.DataFrame()
if ns.is_numerical or (ns.is_classification and data_type != dtype.tags):
model = adapter(ns.predictor)
norm_params = {'target': ns.target, 'dtype_dict': ns.dtype_dict, 'predictor': ns.predictor,
'encoders': ns.encoded_val_data.encoders, 'is_multi_ts': ns.is_multi_ts, 'stop_after': 1e2}
if self.confidence_normalizer:
normalizer = Normalizer(fit_params=norm_params)
normalizer.fit(ns.train_data)
normalizer.prediction_cache = normalizer(ns.encoded_val_data, args=PredictionArguments())
else:
normalizer = None
# instance the ICP
nc = nc_class(model, nc_function, normalizer=normalizer)
icp = icp_class(nc, cal_size=self.validation_size)
output['icp']['__default'] = icp
# setup prediction cache to avoid additional .predict() calls
if ns.is_classification:
if ns.predictor.supports_proba:
icp.nc_function.model.prediction_cache = ns.normal_predictions[all_cat_cols].values
else:
if ns.is_multi_ts:
icp.nc_function.model.prediction_cache = np.array(
[p[0] for p in ns.normal_predictions['prediction']])
preds = icp.nc_function.model.prediction_cache
else:
preds = ns.normal_predictions['prediction']
predicted_classes = pd.get_dummies(preds).values # inflate to one-hot enc
icp.nc_function.model.prediction_cache = predicted_classes
elif ns.is_multi_ts:
# we fit ICPs for time series confidence bounds only at t+1 forecast
icp.nc_function.model.prediction_cache = np.array([p[0] for p in ns.normal_predictions['prediction']])
else:
icp.nc_function.model.prediction_cache = np.array(ns.normal_predictions['prediction'])
if not ns.is_classification:
output['df_target_stddev'] = {'__default': ns.stats_info.df_target_stddev}
# fit additional ICPs in time series tasks with grouped columns
if ns.tss.is_timeseries and ns.tss.group_by:
# create an ICP for each possible group
group_info = ns.data[ns.tss.group_by].to_dict('list')
all_group_combinations = list(product(*[set(x) for x in group_info.values()]))
output['icp']['__mdb_groups'] = all_group_combinations
output['icp']['__mdb_group_keys'] = [x for x in group_info.keys()]
for combination in all_group_combinations:
output['icp'][tuple(combination)] = deepcopy(icp)
# calibrate ICP
icp_df = deepcopy(ns.data)
icp_df, y = clean_df(icp_df, ns.target, ns.is_classification, output.get('label_encoders', None))
output['icp']['__default'].index = icp_df.columns
output['icp']['__default'].calibrate(icp_df.values, y)
# get confidence estimation for validation dataset
conf, ranges = set_conf_range(
icp_df, icp, ns.dtype_dict[ns.target],
output, positive_domain=self.positive_domain, significance=self.fixed_significance)
if not ns.is_classification:
result_df = pd.DataFrame(index=ns.data.index, columns=['confidence', 'lower', 'upper'], dtype=float)
result_df.loc[icp_df.index, 'lower'] = ranges[:, 0]
result_df.loc[icp_df.index, 'upper'] = ranges[:, 1]
else:
result_df = pd.DataFrame(index=ns.data.index, columns=['confidence'], dtype=float)
result_df.loc[icp_df.index, 'confidence'] = conf
# calibrate additional grouped ICPs
if ns.tss.is_timeseries and ns.tss.group_by:
icps = output['icp']
group_keys = icps['__mdb_group_keys']
# add all predictions to DF
icps_df = deepcopy(ns.data)
if ns.is_multi_ts:
icps_df[f'__predicted_{ns.target}'] = [p[0] for p in ns.normal_predictions['prediction']]
else:
icps_df[f'__predicted_{ns.target}'] = ns.normal_predictions['prediction']
for group in icps['__mdb_groups']:
icp_df = icps_df
# filter irrelevant rows for each group combination
icp_df['__mdb_norm_index'] = np.arange(len(icp_df))
for key, val in zip(group_keys, group):
icp_df = icp_df[icp_df[key] == val]
if icps[tuple(group)].nc_function.normalizer is not None:
group_normalizer = icps[tuple(group)].nc_function.normalizer
norm_input_df = ns.encoded_val_data.data_frame.iloc[icp_df.pop('__mdb_norm_index')]
norm_input = EncodedDs(ns.encoded_val_data.encoders, norm_input_df, ns.target)
norm_cache = group_normalizer(norm_input, args=PredictionArguments())
icp_df[f'__norm_{ns.target}'] = norm_cache
# save relevant predictions in the caches, then calibrate the ICP
pred_cache = icp_df.pop(f'__predicted_{ns.target}').values
icps[tuple(group)].nc_function.model.prediction_cache = pred_cache
icp_df, y = clean_df(icp_df, ns.target, ns.is_classification, output.get('label_encoders', None))
if icps[tuple(group)].nc_function.normalizer is not None:
icps[tuple(group)].nc_function.normalizer.prediction_cache = icp_df.pop(
f'__norm_{ns.target}').values
icps[tuple(group)].index = icp_df.columns # important at inference time
icps[tuple(group)].calibrate(icp_df.values, y)
# save training std() for bounds width selection
if not ns.is_classification:
icp_train_df = ns.data
for key, val in zip(group_keys, group):
icp_train_df = icp_train_df[icp_train_df[key] == val]
y_train = icp_train_df[ns.target].values
output['df_target_stddev'][tuple(group)] = y_train.std()
# get bounds for relevant rows in validation dataset
conf, group_ranges = set_conf_range(
icp_df, icps[tuple(group)],
ns.dtype_dict[ns.target],
output, group=tuple(group),
positive_domain=self.positive_domain, significance=self.fixed_significance)
# save group bounds
if not ns.is_classification:
result_df.loc[icp_df.index, 'lower'] = group_ranges[:, 0]
result_df.loc[icp_df.index, 'upper'] = group_ranges[:, 1]
result_df.loc[icp_df.index, 'confidence'] = conf
# consolidate all groups here
output['icp']['__mdb_active'] = True
output['result_df'] = result_df
info = {**info, **output}
return info
[docs] def explain(self, row_insights: pd.DataFrame, global_insights: Dict[str, object],
**kwargs) -> Tuple[pd.DataFrame, Dict[str, object]]:
ns = SimpleNamespace(**kwargs)
if ns.analysis['icp']['__mdb_active']:
icp_X = deepcopy(ns.data)
# replace observed data w/predictions
preds = ns.predictions['prediction']
if ns.tss.is_timeseries and ns.tss.horizon > 1:
preds = [p[0] for p in preds]
for col in [f'timestep_{i}' for i in range(1, ns.tss.horizon)]:
if col in icp_X.columns:
icp_X.pop(col) # erase ignorable columns
icp_X[ns.target_name] = preds
is_categorical = ns.target_dtype in (dtype.binary, dtype.categorical, dtype.cat_array, dtype.cat_tsarray)
is_numerical = ns.target_dtype in (dtype.integer, dtype.float,
dtype.quantity, dtype.num_array, dtype.num_tsarray)
is_anomaly_task = is_numerical and ns.tss.is_timeseries and ns.anomaly_detection
if (is_numerical or is_categorical) and ns.analysis['icp'].get('__mdb_active', False):
base_icp = ns.analysis['icp']['__default']
# reorder DF index
index = base_icp.index.values
index = np.append(index, ns.target_name) if ns.target_name not in index else index
icp_X = icp_X.reindex(columns=index) # important, else bounds can be invalid
# only one normalizer, even if it's a grouped time series task
normalizer = base_icp.nc_function.normalizer
if normalizer:
normalizer.prediction_cache = normalizer(ns.encoded_data, args=PredictionArguments)
icp_X['__mdb_selfaware_scores'] = normalizer.prediction_cache
# get ICP predictions
result_cols = ['lower', 'upper', 'significance'] if is_numerical else ['significance']
result = pd.DataFrame(index=icp_X.index, columns=result_cols)
# base ICP
X = deepcopy(icp_X)
# Calling `values` multiple times increased runtime of this function; referenced var is faster
icp_values = X.values
# get all possible ranges
if ns.tss.is_timeseries and ns.tss.horizon > 1 and is_numerical:
# bounds in time series are only given for the first forecast
base_icp.nc_function.model.prediction_cache = preds
all_confs = base_icp.predict(icp_values)
elif is_numerical:
base_icp.nc_function.model.prediction_cache = preds
all_confs = base_icp.predict(icp_values)
# categorical
else:
predicted_proba = True if any(['__mdb_proba' in col for col in ns.predictions.columns]) else False
if predicted_proba:
all_cat_cols = [col for col in ns.predictions.columns
if '__mdb_proba' in col and '__mdb_unknown_cat' not in col]
class_dists = ns.predictions[all_cat_cols].values
for icol, cat_col in enumerate(all_cat_cols):
row_insights.loc[X.index, cat_col] = class_dists[:, icol]
else:
class_dists = pd.get_dummies(preds).values
base_icp.nc_function.model.prediction_cache = class_dists
all_ranges = np.array([base_icp.predict(icp_values)])
all_confs = np.swapaxes(np.swapaxes(all_ranges, 0, 2), 0, 1)
# convert (B, 2, 99) into (B, 2) given width or error rate constraints
if is_numerical:
significances, confs = get_numeric_conf_range(all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=self.positive_domain,
fixed_conf=ns.pred_args.fixed_confidence)
result.loc[X.index, 'lower'] = confs[:, 0]
result.loc[X.index, 'upper'] = confs[:, 1]
else:
significances = get_categorical_conf(all_confs.squeeze())
result.loc[X.index, 'significance'] = significances
# grouped time series, we replace bounds in rows that have a trained ICP
if ns.analysis['icp'].get('__mdb_groups', False):
icps = ns.analysis['icp']
group_keys = icps['__mdb_group_keys']
for group in icps['__mdb_groups']:
icp = icps[tuple(group)]
# check ICP has calibration scores
if icp.cal_scores[0].shape[0] > 0:
# filter rows by group
X = deepcopy(icp_X)
for key, val in zip(group_keys, group):
X = X[X[key] == val]
if X.size > 0:
# set ICP caches
icp.nc_function.model.prediction_cache = X.pop(ns.target_name).values
if icp.nc_function.normalizer:
icp.nc_function.normalizer.prediction_cache = X.pop('__mdb_selfaware_scores').values
# predict and get confidence level given width or error rate constraints
if is_numerical:
all_confs = icp.predict(X.values)
fixed_conf = ns.pred_args.fixed_confidence
significances, confs = get_numeric_conf_range(
all_confs,
df_target_stddev=ns.analysis['df_target_stddev'],
positive_domain=self.positive_domain,
group=tuple(group),
fixed_conf=fixed_conf
)
# only replace where grouped ICP is more informative (i.e. tighter)
if fixed_conf is None:
default_widths = result.loc[X.index, 'upper'] - result.loc[X.index, 'lower']
grouped_widths = np.subtract(confs[:, 1], confs[:, 0])
insert_index = (default_widths > grouped_widths)[lambda x: x.isin([True])].index
conf_index = (default_widths.reset_index(drop=True) >
grouped_widths)[lambda x: x.isin([True])].index
result.loc[insert_index, 'lower'] = confs[conf_index, 0]
result.loc[insert_index, 'upper'] = confs[conf_index, 1]
result.loc[insert_index, 'significance'] = significances[conf_index]
else:
all_ranges = np.array([icp.predict(X.values)])
all_confs = np.swapaxes(np.swapaxes(all_ranges, 0, 2), 0, 1)
significances = get_categorical_conf(all_confs)
result.loc[X.index, 'significance'] = significances
row_insights['confidence'] = result['significance'].astype(float).tolist()
if is_numerical:
row_insights['lower'] = result['lower'].astype(float)
row_insights['upper'] = result['upper'].astype(float)
# anomaly detection
if is_anomaly_task:
anomalies = get_anomalies(row_insights,
ns.data[ns.target_name],
cooldown=ns.pred_args.anomaly_cooldown)
row_insights['anomaly'] = anomalies
if ns.tss.is_timeseries and ns.tss.horizon > 1 and is_numerical:
row_insights = add_tn_conf_bounds(row_insights, ns.tss)
# clip bounds if necessary
if is_numerical:
lower_limit = 0.0 if ns.positive_domain else -pow(2, 62)
upper_limit = pow(2, 62)
if not (ns.tss.is_timeseries and ns.tss.horizon > 1):
row_insights['upper'] = row_insights['upper'].clip(lower_limit, upper_limit)
row_insights['lower'] = row_insights['lower'].clip(lower_limit, upper_limit)
else:
row_insights['upper'] = [np.array(row).clip(lower_limit, upper_limit).tolist()
for row in row_insights['upper']]
row_insights['lower'] = [np.array(row).clip(lower_limit, upper_limit).tolist()
for row in row_insights['lower']]
# Make sure the target and real values are of an appropriate type
if ns.tss.is_timeseries and ns.tss.horizon > 1:
# Array output that are not of type <array> originally are odd and I'm not sure how to handle them
# Or if they even need handling yet
pass
elif ns.target_dtype in (dtype.integer):
row_insights['prediction'] = row_insights['prediction'].astype(int)
row_insights['upper'] = row_insights['upper'].astype(int)
row_insights['lower'] = row_insights['lower'].astype(int)
elif ns.target_dtype in (dtype.float, dtype.quantity):
row_insights['prediction'] = row_insights['prediction'].astype(float)
row_insights['upper'] = row_insights['upper'].astype(float)
row_insights['lower'] = row_insights['lower'].astype(float)
elif ns.target_dtype in (dtype.short_text, dtype.rich_text, dtype.binary, dtype.categorical):
row_insights['prediction'] = row_insights['prediction'].astype(str)
return row_insights, global_insights