Source code for lightwood.encoder.datetime.datetime

import datetime
import calendar
from typing import Optional
import torch
from lightwood.encoder.base import BaseEncoder
from lightwood.helpers.general import is_none


[docs]class DatetimeEncoder(BaseEncoder): """ This encoder produces an encoded representation for timestamps. The approach consists on decomposing the timestamp objects into its constituent units (e.g. day-of-week, month, year, etc), and describing each of those with a single value that represents the magnitude in a sensible cycle length. """ # noqa def __init__(self, is_target: bool = False): super().__init__(is_target) self.fields = ['year', 'month', 'day', 'weekday', 'hour', 'minute', 'second'] self.constants = {'year': 3000.0, 'month': 12.0, 'weekday': 7.0, 'hour': 24.0, 'minute': 60.0, 'second': 60.0} self.output_size = 7
[docs] def prepare(self, priming_data): if self.is_prepared: raise Exception('You can only call "prepare" once for a given encoder.') self.is_prepared = True
[docs] def encode(self, data): """ :param data: # @TODO: receive a consistent data type here; currently either list of lists or pd.Series w/lists :return: encoded data """ if not self.is_prepared: raise Exception('You need to call "prepare" before calling "encode" or "decode".') ret = [self.encode_one(unix_timestamp) for unix_timestamp in data] return torch.Tensor(ret)
[docs] def encode_one(self, unix_timestamp: Optional[float]): """ Encodes a list of unix_timestamps, or a list of tensors with unix_timestamps :param data: list of unix_timestamps (unix_timestamp resolution is seconds) :return: a list of vectors """ if is_none(unix_timestamp): vector = [0] * len(self.fields) else: c = self.constants date = datetime.datetime.fromtimestamp(unix_timestamp) day_constant = calendar.monthrange(date.year, date.month)[1] vector = [date.year / c['year'], date.month / c['month'], date.day / day_constant, date.weekday() / c['weekday'], date.hour / c['hour'], date.minute / c['minute'], date.second / c['second']] return vector
[docs] def decode(self, encoded_data, return_as_datetime=False): ret = [] if len(encoded_data.shape) > 2 and encoded_data.shape[0] == 1: encoded_data = encoded_data.squeeze(0) for vector in encoded_data.tolist(): ret.append(self.decode_one(vector, return_as_datetime=return_as_datetime)) return ret
def decode_one(self, vector, return_as_datetime=False): if sum(vector) == 0: decoded = None else: c = self.constants year = max(0, round(vector[0] * c['year'])) month = max(1, min(12, round(vector[1] * c['month']))) day_constant = calendar.monthrange(year, month)[-1] day = max(1, min(round(vector[2] * day_constant), day_constant)) hour = max(0, min(23, round(vector[4] * c['hour']))) minute = max(0, min(59, round(vector[5] * c['minute']))) second = max(0, min(59, round(vector[6] * c['second']))) dt = datetime.datetime(year=year, month=month, day=day, hour=hour, minute=minute, second=second) if return_as_datetime is True: decoded = dt else: decoded = round(dt.timestamp()) return decoded