Source code for lightwood.data.cleaner

import re
import datetime
from copy import deepcopy
from dateutil.parser import parse as parse_dt
from typing import Dict, List, Optional, Tuple, Callable, Union

import numpy as np
import pandas as pd

from lightwood.api.dtype import dtype
from lightwood.helpers import text
from lightwood.helpers.log import log
from lightwood.helpers.imputers import BaseImputer
from lightwood.api.types import TimeseriesSettings
from lightwood.helpers.numeric import is_nan_numeric


[docs]def cleaner( data: pd.DataFrame, dtype_dict: Dict[str, str], pct_invalid: float, identifiers: Dict[str, str], target: str, mode: str, timeseries_settings: TimeseriesSettings, anomaly_detection: bool, imputers: Dict[str, BaseImputer] = {}, custom_cleaning_functions: Dict[str, str] = {} ) -> pd.DataFrame: """ The cleaner is a function which takes in the raw data, plus additional information about it's types and about the problem. Based on this it generates a "clean" representation of the data, where each column has an ideal standardized type and all malformed or otherwise missing or invalid elements are turned into ``None``. Optionally, these ``None`` values can be replaced with imputers. :param data: The raw data :param dtype_dict: Type information for each column :param pct_invalid: How much of each column can be invalid :param identifiers: A dict containing all identifier typed columns :param target: The target columns :param mode: Can be "predict" or "train" :param imputers: The key corresponds to the single input column that will be imputed by the object. Refer to the imputer documentation for more details. :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object :param anomaly_detection: Are we detecting anomalies with this predictor? :returns: The cleaned data """ # noqa data = _remove_columns(data, identifiers, target, mode, timeseries_settings, anomaly_detection, dtype_dict) data['__mdb_original_index'] = np.arange(len(data)) for col in _get_columns_to_clean(data, dtype_dict, mode, target): # Get and apply a cleaning function for each data type # If you want to customize the cleaner, it's likely you can to modify ``get_cleaning_func`` data[col] = data[col].apply(get_cleaning_func(dtype_dict[col], custom_cleaning_functions)) if timeseries_settings.is_timeseries: data = clean_timeseries(data, timeseries_settings) for col, imputer in imputers.items(): if col in data.columns: cols = [col] + [col for col in imputer.dependencies] data[col] = imputer.impute(data[cols]) return data
def _check_if_invalid(new_data: pd.Series, pct_invalid: float, col_name: str): """ Checks how many invalid data points there are. Invalid data points are flagged as "Nones" from the cleaning processs (see data/cleaner.py for default). If there are too many invalid data points (specified by `pct_invalid`), then an error message will pop up. This is used as a safeguard for very messy data. :param new_data: data to check for invalid values. :param pct_invalid: maximum percentage of invalid values. If this threshold is surpassed, an exception is raised. :param col_name: name of the column to analyze. """ # noqa chk_invalid = ( 100 * (len(new_data) - len([x for x in new_data if x is not None])) / len(new_data) ) if chk_invalid > pct_invalid: err = f'Too many ({chk_invalid}%) invalid values in column {col_name}nam' log.error(err) raise Exception(err) def get_cleaning_func(data_dtype: dtype, custom_cleaning_functions: Dict[str, str]) -> Callable: """ For the provided data type, provide the appropriate cleaning function. Below are the defaults, users can either override this function OR impose a custom block. :param data_dtype: The data-type (inferred from a column) as prescribed from ``api.dtype`` :returns: The appropriate function that will pre-process (clean) data of specified dtype. """ # noqa if data_dtype in custom_cleaning_functions: clean_func = eval(custom_cleaning_functions[data_dtype]) elif data_dtype in (dtype.date, dtype.datetime): clean_func = _standardize_datetime elif data_dtype in (dtype.float, dtype.num_tsarray): clean_func = _clean_float elif data_dtype in (dtype.integer): clean_func = _clean_int elif data_dtype in (dtype.num_array): clean_func = _standardize_num_array elif data_dtype in (dtype.cat_array): clean_func = _standardize_cat_array elif data_dtype in (dtype.tags): clean_func = _tags_to_tuples elif data_dtype in (dtype.quantity): clean_func = _clean_quantity elif data_dtype in ( dtype.short_text, dtype.rich_text, dtype.categorical, dtype.binary, dtype.audio, dtype.image, dtype.video, dtype.cat_tsarray ): clean_func = _clean_text else: raise ValueError(f"{data_dtype} is not supported. Check lightwood.api.dtype") return clean_func # ------------------------- # # Temporal Cleaning # ------------------------- # def _standardize_datetime(element: object) -> Optional[float]: """ Parses an expected date-time element. Intakes an element that can in theory be anything. """ try: date = parse_dt(str(element)) except Exception: try: date = datetime.datetime.utcfromtimestamp(element) except Exception: return None return date.timestamp() # ------------------------- # # Tags/Sequences # ------------------------- # # TODO Make it split on something other than commas def _tags_to_tuples(tags_str: str) -> Tuple[str]: """ Converts comma-separated values into a tuple to preserve a sequence/array. Ex: >> x = 'apples, oranges, bananas' >> _tags_to_tuples(x) >> ('apples', 'oranges', 'bananas') """ try: return tuple([x.strip() for x in tags_str.split(",")]) except Exception: return tuple() def _standardize_num_array(element: object) -> Optional[Union[List[float], float]]: """ Given an array of numbers in the form ``[1, 2, 3, 4]``, converts into a numerical sequence. :param element: An array-like element in a sequence :returns: standardized array OR scalar number IF edge case Ex of edge case: >> element = [1] >> _standardize_num_array(element) >> 1 """ try: element = str(element) element = element.rstrip("]").lstrip("[") element = element.rstrip(" ").lstrip(" ") element = element.replace(", ", " ").replace(",", " ") # Handles cases where arrays are numbers if " " not in element: element = _clean_float(element) else: element = [float(x) for x in element.split(" ")] except Exception: pass return element def _standardize_cat_array(element: List) -> Optional[List[str]]: """ Given an array, replace non-string with string casted (or, failing that, None) tokens. None values are kept. :param element: An array-like element in a sequence :returns: standardized array OR label IF edge case Ex of edge case: >> element = ['a'] >> _standardize_cat_array(element) >> 'a' """ if len(element) == 1: return element[0] else: new_element = [] for sub_elt in element: try: new_sub_elt = str(sub_elt) if sub_elt else None new_element.append(new_sub_elt) except TypeError: new_element.append(None) element = new_element return element # ------------------------- # # Integers/Floats/Quantities # ------------------------- # def _clean_float(element: object) -> Optional[float]: """ Given an element, converts it into float numeric format. If element is NaN, or inf, then returns None. """ try: cleaned_float = text.clean_float(element) if is_nan_numeric(cleaned_float): return None return cleaned_float except Exception: return None def _clean_int(element: object) -> Optional[int]: """ Given an element, converts it into integer numeric format. If element is NaN, or inf, then returns None. """ element = _clean_float(element) if element is not None: element = int(element) return element def _clean_quantity(element: object) -> Optional[float]: """ Given a quantity, clean and convert it into float numeric format. If element is NaN, or inf, then returns None. """ element = float(re.sub("[^0-9.,]", "", str(element)).replace(",", ".")) return _clean_float(element) # ------------------------- # # Text # ------------------------- # def _clean_text(element: object) -> Union[str, None]: if isinstance(element, str): return element elif element is None or element != element: return None else: return str(element) # ------------------------- # # Other helpers # ------------------------- # def _rm_rows_w_empty_targets(df: pd.DataFrame, target: str) -> pd.DataFrame: """ Drop any rows that have targets as unknown. Targets are necessary to train. :param df: The input dataframe including the target value :param target: the column name that is the output target variable :returns: Data with any target smissing """ # Compare length before/after len_before = len(df) # Use Pandas ```dropna``` to omit any rows with missing values for targets; these cannot be trained df = df.dropna(subset=[target]) # Compare length with after len_after = len(df) nr_removed = len_before - len_after if nr_removed != 0: log.warning( f"Removed {nr_removed} rows because target was missing. Training on these rows is not possible." ) # noqa return df def _remove_columns(data: pd.DataFrame, identifiers: Dict[str, object], target: str, mode: str, timeseries_settings: TimeseriesSettings, anomaly_detection: bool, dtype_dict: Dict[str, dtype]) -> pd.DataFrame: """ Drop columns we don't want to use in order to train or predict :param data: The raw data :param dtype_dict: Type information for each column :param identifiers: A dict containing all identifier typed columns :param target: The target columns :param mode: Can be "predict" or "train" :param timeseries_settings: Timeseries related settings, only relevant for timeseries predictors, otherwise can be the default object :param anomaly_detection: Are we detecting anomalies with this predictor? :returns: A (new) dataframe without the dropped columns """ # noqa data = deepcopy(data) to_drop = [*[x for x in identifiers.keys() if x != target], *[x for x in data.columns if x in dtype_dict and dtype_dict[x] == dtype.invalid]] exceptions = ["__mdb_make_predictions"] if timeseries_settings.group_by is not None: exceptions += timeseries_settings.group_by to_drop = [x for x in to_drop if x in data.columns and x not in exceptions] log.info(f'Dropping features: {to_drop}') data = data.drop(columns=to_drop) if mode == "train": data = _rm_rows_w_empty_targets(data, target) if mode == "predict": if ( target in data.columns and (not timeseries_settings.is_timeseries or not timeseries_settings.use_previous_target) and not anomaly_detection ): data = data.drop(columns=[target]) # Drop extra columns for name in list(data.columns): if name not in dtype_dict and name not in exceptions: data = data.drop(columns=[name]) return data def _get_columns_to_clean(data: pd.DataFrame, dtype_dict: Dict[str, dtype], mode: str, target: str) -> List[str]: """ :param data: The raw data :param dtype_dict: Type information for each column :param target: The target columns :param mode: Can be "predict" or "train" :returns: A list of columns that we want to clean """ # noqa cleanable_columns = [] for name, _ in dtype_dict.items(): if mode == "predict": if name == target: continue if name in data.columns: cleanable_columns.append(name) return cleanable_columns def clean_timeseries(df: pd.DataFrame, tss: TimeseriesSettings) -> pd.DataFrame: """ All timeseries-specific cleaning logic goes here. Currently: 1) Any row with `nan`-valued order-by measurements is dropped. :param df: data. :param tss: timeseries settings :return: cleaned data. """ invalid_rows = [] for idx, row in df.iterrows(): if pd.isna(row[tss.order_by[0]]): invalid_rows.append(idx) df = df.drop(invalid_rows) return df