Source code for spacekit.preprocessor.encode

import os
import json
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
import numpy as np

from spacekit.logger.log import Logger


def boolean_encoder(x):
    if x in [True, "True", "T", "t"]:
        return 1
    else:
        return 0


def nan_encoder(x, truevals):
    if x in truevals:
        return 1
    else:
        return 0


def encode_booleans(df, cols, special=False, replace=False, rename="", verbose=False):
    cols = [c for c in cols if c in df.columns]
    if verbose:
        print(f"\nNaNs to be NaNdled:\n{df[cols].isna().sum()}\n")
    df_bool = df[cols].copy()
    encoded_cols = []
    sfx = "_enc" if not rename else rename
    for col in cols:
        enc_col = f"{col}{sfx}"
        encoded_cols.append(enc_col)
        if special is True:
            truevals = list(df_bool[col].value_counts().index)
            df_bool[enc_col] = df_bool[col].apply(lambda x: nan_encoder(x, truevals))
        else:
            df_bool[enc_col] = df_bool[col].apply(lambda x: boolean_encoder(x))
    # merge back into original dataframe
    df_bool.drop(cols, axis=1, inplace=True)
    df = pd.concat([df, df_bool], axis=1)
    if replace is True:
        df.drop(cols, axis=1, inplace=True)
        if not rename:  # make encoded colnames same as the originals
            df.rename(dict(zip(encoded_cols, cols)), axis=1, inplace=True)
            encoded_cols = cols
    return df


[docs]def encode_target_data(y_train, y_test): """Label encodes target class training and test data for multi-classification models. Parameters ---------- y_train : dataframe or ndarray training target data y_test : dataframe or ndarray test target data Returns ------- ndarrays y_train, y_test """ # label encode class values as integers encoder = LabelEncoder() encoder.fit(y_train) y_train_enc = encoder.transform(y_train) y_train = to_categorical(y_train_enc) # test set encoder.fit(y_test) y_test_enc = encoder.transform(y_test) y_test = to_categorical(y_test_enc) # ensure train/test targets have correct shape (4 bins) print(y_train.shape, y_test.shape) return y_train, y_test
class PairEncoder: def __init__(self, name="PairEncoder", **log_kws): self.__name__ = name self.arr = None self.transformed = None self.invpairs = None self.inversed = None self.log = Logger(self.__name__, **log_kws).spacekit_logger() def lambda_func(self, inverse=False): if inverse is False: def L(x): return self.keypairs[x] return [L(a) for a in self.arr] else: self.inverse_pairs() def inv(i): return self.invpairs[i] return [inv(b) for b in self.transformed] def inverse_pairs(self): self.invpairs = {} for key, value in self.keypairs.items(): self.invpairs[value] = key return self.invpairs def handle_unknowns(self, unknowns): uvals = np.unique(self.arr[unknowns]) self.log.warning(f"Found unknown values:\n {uvals}") try: for u in uvals: add_encoding = max(list(self.keypairs.values())) + 1 self.keypairs[u] = add_encoding self.classes_ = list(self.keypairs.keys()) except Exception as e: self.log.error("Unable to add encoding for unknown value(s)", e) def fit(self, data, keypairs, axiscol=None, handle_unknowns=True): if isinstance(data, pd.DataFrame): if axiscol is None: self.log.error( "Must indicate which column to fit if `data` is a `dataframe`." ) return try: self.arr = np.asarray(data[axiscol], dtype=object) except Exception as e: self.log.error(e) elif isinstance(data, np.ndarray): if len(data.shape) > 1: if data.shape[-1] > 1: if axiscol is None: self.log.error("Must specify index using `axiscol`") return else: self.arr = np.asarray(data[:, axiscol], dtype=object) else: self.arr = np.asarray(data, dtype=object) else: self.log.error("Invalid Type: `data` must be of an array or dataframe.") return self.keypairs = keypairs self.classes_ = list(self.keypairs.keys()) unknowns = np.where([a not in self.classes_ for a in self.arr])[0] if unknowns.shape[0] > 0: if handle_unknowns is True: self.handle_unknowns(unknowns) else: self.log.error( f"Found unknown values in {axiscol}:\n {self.arr[unknowns]}" ) return try: self.unique = np.unique(self.arr) except Exception as e: self.log.error(e) return self def transform(self): if self.arr is None: self.log.error("Must fit the data first.") return self.transformed = self.lambda_func() return self.transformed def inverse_transform(self): self.inversed = self.lambda_func(inverse=True) return self.inversed def fit_transform(self, data, keypairs, axiscol=None): self.fit(data, keypairs, axiscol=axiscol) self.transform() class CategoricalEncoder: def __init__( self, data, fkeys=[], names=[], drop=False, rename=False, keypair_file=None, encoding_pairs=None, verbose=0, name="CategoricalEncoder", **log_kws, ): self.data = data self.fkeys = fkeys self.names = names self.drop = drop self.rename = rename self.keypair_file = keypair_file self.encoding_pairs = encoding_pairs self.verbose = verbose self.__name__ = name self.log = Logger(self.__name__, **log_kws).spacekit_logger() self.encodings = dict(zip(self.fkeys, self.names)) self.df = self.categorical_data() def categorical_data(self): """Makes a copy of input dataframe and extracts only the categorical features based on the column names in `fkeys`. Returns ------- df: dataframe dataframe with only the categorical feature columns """ return self.data.copy()[self.fkeys] def rejoin_original(self): encoded = list(self.encodings.values()) originals = list(self.encodings.keys()) self.df.drop(originals, axis=1, inplace=True) self.df = self.data.join(self.df, how="left") if self.verbose: self.display_encoding() if self.drop is True: self.df.drop(originals, axis=1, inplace=True) if self.rename is True: self.df.rename(dict(zip(encoded, originals)), axis=1, inplace=True) def _encode_features(self): """Encodes input features matching column names assigned to the object's ``encodings`` keys. Returns ------- dataframe original dataframe with all categorical type features label-encoded. """ if self.encoding_pairs is None: self.log.error( "encoding_pairs attr must be instantiated with key-value pairs" ) return self.log.debug("Encoding categorical features...") for col, name in self.encodings.items(): keypairs = self.encoding_pairs[col] enc = PairEncoder() enc.fit_transform(self.df, keypairs, axiscol=col) self.df[name] = enc.transformed if self.verbose: self.log.debug(f"*** {col} --> {name} ***") self.log.debug( f"\n\nORIGINAL:\n{self.df[col].value_counts()}\n\nENCODED:\n{self.df[name].value_counts()}\n" ) self.rejoin_original() return self.df def display_encoding(self): self.log.info("---" * 7) for k, v in self.encodings.items(): res = list( zip( self.df[v].value_counts(), self.df[v].unique(), self.df[k].value_counts(), self.df[k].unique(), ) ) self.log.info(f"{k}<--->{v}") self.log.info("#VAL\t\tENC\t\t#VAL\t\tORDINAL") for r in res: string = "\t\t".join(str(i) for i in r) self.log.info(string) self.log.info("---" * 7) def load_keypair_file(self): if os.path.exists(self.keypair_file): with open(self.keypair_file, "r") as j: self.encoding_pairs = json.load(j) def save_keypair_file(self, fpath): with open(fpath, "w") as f: json.dump(self.encoding_pairs, f)
[docs]class HstSvmEncoder(CategoricalEncoder): """Categorical encoding class for HST Single Visit Mosiac regression test data inputs.""" def __init__( self, data, fkeys=["category", "detector", "wcstype"], names=["cat", "det", "wcs"], drop=False, rename=False, keypair_file=None, encoding_pairs=None, **log_kws, ): """Instantiates an HstSvmEncoder class object. Parameters ---------- data : dataframe input data containing features (columns) to be encoded fkeys: list categorical-type column names (str) to be encoded names: list new names to assign columns of the encoded versions of categorical data """ super().__init__( data, fkeys=fkeys, names=names, drop=drop, rename=rename, keypair_file=keypair_file, encoding_pairs=encoding_pairs, name="HstSvmEncoder", **log_kws, ) self.make_keypairs() self.encode_categories() def __repr__(self): return ( "encodings: %s \n category_keys: %s \n detector_keys: %s \n wcs_keys: %s" % (self.encodings, self.category_keys, self.detector_keys, self.wcs_keys) ) def encode_features(self): return super()._encode_features()
[docs] def make_keypairs(self): """Instantiates key-pair dictionaries for each of the categorical features listed in `fkeys`. Except for the target classification "category" feature, each string value is assigned an integer in alphabetical and increasing order, respectively. For the image target category feature, an integer is assigned to each abbreviated version of strings collected from the MAST archive). The extra abbreviation step is done to allow for debugging and analysis purposes (value-count of abbreviated versions are printed to stdout before the final encoding). Returns ------- dict key-pair values for image target category classification (category), detectors and wcstype. """ self.category_keys = { "C": 0, "SS": 1, "I": 2, "U": 3, "SC": 4, "S": 5, "GC": 6, "G": 7, } self.detector_keys = {"hrc": 0, "ir": 1, "sbc": 2, "uvis": 3, "wfc": 4} self.wcs_keys = { "a posteriori": 0, "a priori": 1, "default a": 2, "not aligned": 3, } self.encoding_pairs = { "category": self.category_keys, "detector": self.detector_keys, "wcstype": self.wcs_keys, }
[docs] def init_categories(self): """Assigns abbreviated character code as key-pair value for each type of target category classification (as determined by data on MAST archive). Returns ------- dict key-pair values for image target category classification. """ return { "CALIBRATION": "C", "SOLAR SYSTEM": "SS", "ISM": "I", "EXT-MEDIUM": "I", "STAR": "S", "EXT-STAR": "S", "UNIDENTIFIED": "U", "STELLAR CLUSTER": "SC", "EXT-CLUSTER": "SC", "CLUSTER OF GALAXIES": "GC", "GALAXY": "G", "None": "U", }
[docs] def encode_categories(self, cname="category", sep=";"): """Transforms the raw string inputs from MAST target category naming conventions into an abbreviated form. For example, `CLUSTER OF GALAXIES;GRAVITATIONA` becomes `GC` for galaxy cluster; and `STELLAR CLUSTER;GLOBULAR CLUSTER` becomes `SC` for stellar cluster. This serves to group similar but differently named objects into a discrete set of 8 possible categorizations. The 8 categories will then be encoded into integer values in the final encoding step (machine learning inputs must be numeric). Returns ------- dataframe original dataframe with category input feature values encoded. """ CAT = {} ckeys = self.init_categories() for idx, cat in self.df[cname].items(): c = cat.split(sep)[0] if c in ckeys: CAT[idx] = ckeys[c] df_cat = pd.DataFrame.from_dict(CAT, orient="index", columns=["category"]) self.df.drop("category", axis=1, inplace=True) self.df = self.df.join(df_cat, how="left") return self.df
[docs]class HstCalEncoder(CategoricalEncoder): """Categorical encoding class for HST Calibration in the Cloud Reprocessing inputs.""" def __init__( self, data, fkeys=["DETECTOR", "SUBARRAY", "DRIZCORR", "PCTECORR"], names=["detector", "subarray", "drizcorr", "pctecorr"], keypair_file=None, encoding_pairs=None, **log_kws, ): """Instantiates a CalEncoder class object. Parameters ---------- data : dataframe input data containing features (columns) to be encoded fkeys: list categorical-type column names (str) to be encoded names: list new names to assign columns of the encoded versions of categorical data """ self.fkeys = fkeys self.names = names super().__init__( data, fkeys=fkeys, names=names, keypair_file=keypair_file, encoding_pairs=encoding_pairs, name="HstCalEncoder", **log_kws, ) self.make_keypairs() def __repr__(self): return "encodings: %s \n keypairs: %s \n" % ( self.encodings, self.encoding_pairs, ) def set_calibration_keys(self): return { "PERFORM": 1, "OTHER": 0, } def set_detector_keys(self): return {"UVIS": 1, "WFC": 1, "OTHER": 0} def set_subarray_keys(self): return {"True": 1, "False": 0} def set_crsplit_keys(self): return {"NaN": 0, "1.0": 1, "OTHER": 2} def set_dtype_keys(self, i): return {"0": 1, "OTHER": 0} def set_instr_keys(self, i): return dict(j=0, l=1, o=2, i=3) def make_keypairs(self): self.encoding_pairs = dict( drizcorr=self.set_calibration_keys(), pctecorr=self.set_calibration_keys(), detector=self.set_detector_keys(), subarray=self.set_subarray_keys(), crsplit=self.set_crsplit_keys(), dtype=self.set_dtype_keys(), instr=self.set_instr_keys(), ) def encode_features(self): super()._encode_features()
class JwstEncoder(CategoricalEncoder): def __init__( self, data, fkeys=[], names=[], drop=True, rename=True, encoding_pairs=None, keypair_file=None, **log_kws, ): if not names: names = [c + "_enc" for c in fkeys] super().__init__( data, fkeys=fkeys, names=names, drop=drop, rename=rename, keypair_file=keypair_file, encoding_pairs=encoding_pairs, name="JwstEncoder", **log_kws, ) # self.make_keypairs() # for training only def make_keypairs(self): """Instantiates key-pair dictionaries for each of the categorical features.""" self.abbreviate_strings(self, cname="subarray", ckeys=["MASK", "SUB", "WFSS"]) keymaker = CategoricalKeymaker( self.df, list(self.df.columns), recast=["channel"] ) self.encoding_pairs = keymaker.encode_categories() def abbreviate_strings(self, cname="subarray", ckeys=["MASK", "SUB", "WFSS"]): """Abbreviates the original values based on the starting values of the string. For example, if "MASK" is passed as a value in the `ckeys` keyword arg, any value starting with "MASK" within the `cname` column is shortened to "MASK". For the "subarray" column in JWST, this reduces the number of possible encodings to 7 values. The 7 subarray values will then be encoded into integers in the final encoding step. Returns ------- dataframe original dataframe with subarray input feature values encoded. """ for key in ckeys: self.df.loc[self.df[cname].str.startswith(key), cname] = key def encode_features(self): super()._encode_features() class CategoricalKeymaker: def __init__( self, df, cols, keypair_file=None, recast=[], codify=[], forced_zeros={}, name="CategoricalKeymaker", **log_kws, ): self.df = df self.cols = [c for c in cols if c in self.df.columns] self.keypair_file = keypair_file self.recast = recast self.codify = codify self.forced_zeros = forced_zeros self.__name__ = name self.log = Logger(self.__name__, **log_kws).spacekit_logger() self.non_defaults() self.set_default_kwargs() self.set_recast_kwargs() self.set_codify_kwargs() self.set_encoding_kwargs() def load_keypair_data(self, keypair_file): if os.path.exists(keypair_file): with open(keypair_file, "r") as j: self.encoding_pairs = json.load(j) def save_keypair_data(self, fpath): with open(fpath, "w") as f: json.dump(self.encoding_pairs, f) def encode_categories(self, inverse=True): self.encoding_pairs = {} for col in self.cols: try: enc_kwargs = self.encoding_kwargs.get(col, self.default_kwargs) encoding_key = self.make_encoding_key(col, **enc_kwargs) self.encoding_pairs[col] = encoding_key except KeyError: self.log.error(f"Key Error occurred while encoding {col}") if inverse is True: enc_pairs = {} for col, pairs in self.encoding_pairs.items(): enc_pairs[col] = {} for k, v in pairs.items(): enc_pairs[col][v] = k self.encoding_pairs = enc_pairs return self.encoding_pairs def make_encoding_key(self, col, forced_zero="NONE", recast=None, codify=None): # convert values / apply datatype recasting if recast: self.recast_data(col, **recast) keypairs = None if codify: # convert long string to abbreviated string prior to numeric encoding coded, keypairs = self.codify_keypairs( col=col, forced_zero=forced_zero, **codify ) self.df[col + "_c"] = self.df[col].apply( lambda x: self.abbreviator(x, keypairs) ) col += "_c" else: coded = self.make_default_keypairs(col, zero_val=forced_zero) encoding_key = dict(zip(coded.values(), coded.keys())) if keypairs: for i, j in encoding_key.items(): for k, v in keypairs.items(): if j == v: encoding_key[i] = {j: k} return encoding_key def get_inversed_keypairs(self): self.inversed_pairs = dict() for column, keypairs in self.encoding_pairs.items(): inverse_pairs = self.inverse_keypairs(keypairs) self.inversed_pairs[column] = inverse_pairs def inverse_keypairs(self, keypairs): inverse_pairs = {} for k, v in keypairs.items(): if isinstance(v, dict): for i, j in v.items(): inverse_pairs[j] = k else: inverse_pairs[v] = k return inverse_pairs def keypair_encoder(self, x, keypairs, col): if x not in list(keypairs.values()): self.log.warning(f"New value not in keypairs - adding {x}...") keys = sorted(int(k) for k in list(keypairs.keys())) new_key = keys[-1] + 1 keypairs[str(new_key)] = x self.encoding_pairs[col] = keypairs self.inversed_pairs[col] = self.inverse_keypairs(keypairs) return new_key else: return self.inversed_pairs[col].get(x) def encode_from_keypairs(self): self.get_inversed_keypairs() for col in self.cols: keypairs = self.encoding_pairs.get(col, None) if keypairs: self.df[col] = self.df[col].apply( lambda x: self.keypair_encoder(x, keypairs, col) ) return self.df def non_defaults(self): non_default_cols = self.recast + self.codify + list(self.forced_zeros.keys()) self.non_default_cols = list(set(non_default_cols)) def set_encoding_kwargs(self): self.encoding_kwargs = dict() for col in self.df.columns: if col in self.non_default_cols: recast_kwargs = self.recast_kwargs if col in self.recast else None codify_kwargs = self.codify_kwargs if col in self.codify else None forced_zero = self.forced_zeros.get(col, "NONE") self.encoding_kwargs.update( { col: dict( forced_zero=forced_zero, recast=recast_kwargs, codify=codify_kwargs, ) } ) def set_default_kwargs(self, forced_zero="NONE", recast=None, codify=None): self.default_kwargs = dict( forced_zero=forced_zero, recast=recast, codify=codify ) def set_recast_kwargs( self, stringify=True, splitify=True, make_upper=True, splitter=".", i=0 ): self.recast_kwargs = dict( stringify=stringify, splitify=splitify, make_upper=make_upper, splitter=splitter, i=i, ) def set_codify_kwargs(self, abbr=True, keep_orig=False, inverse=True): self.codify_kwargs = dict( abbr=abbr, keep_orig=keep_orig, inverse=inverse, ) def find_unique_values(self, col="visitype"): val_types = [] for val in list(self.df[col].value_counts().index): if isinstance(val, list): for t in val: val_types.append(t) else: val_types.append(val) val_types = sorted(list(set(val_types))) vtypes = [] for v in val_types: if not isinstance(v, str): v = str(int(v)) vtypes.append(v) return list(set(vtypes)), list(set(val_types)) def abbreviate_names(self, vtypes, strips=".+"): vtypes_new = [] for v in vtypes: if strips: v = v.strip(strips) words = v.split("_") name = "" for w in words: name += w[0] vtypes_new.append(name) return vtypes_new def match_keypairs(self, vtypes1, vtypes2): keypairs = {} for v2 in vtypes2: keypairs[v2] = [] for v1 in vtypes1: if not isinstance(v1, str): v = str(int(v1)) else: v = v1 keypairs[v].append(v1) return keypairs def create_keypair_dict(self, col, abbr=True, keep_orig=False, inverse=True): keypairs = {} if keep_orig is True: vtypes2, vtypes = self.find_unique_values(col=col) else: vtypes, _ = self.find_unique_values(col=col) vtypes2 = None if vtypes2 is not None: keypairs = self.match_keypairs(vtypes, vtypes2) vtypes_new = self.abbreviate_names(vtypes) if abbr is True else None if vtypes_new is not None: for a, b in list(zip(vtypes, vtypes_new)): keypairs[b] = [a] if inverse is True: keypairs_inv = {} for k, v in keypairs.items(): keypairs_inv[v[0]] = k keypairs = keypairs_inv return keypairs def make_default_keypairs(self, col, zero_val="NONE"): keys = sorted(list(self.df[col].unique())) if zero_val in keys and keys[0] != zero_val: try: idx = np.where([np.asarray(keys) == zero_val])[1][0] self.log.info(f"Moving {zero_val} index from {idx} to 0") keys.pop(idx) keys.insert(0, zero_val) except Exception as e: self.log.error( "Unable to locate zero_val index while making default keypairs", str(e), ) vals = list(range(len(keys))) keypairs = dict(zip(keys, vals)) return keypairs def codify_keypairs(self, col, forced_zero="NONE", **kwargs): # convert long string to abbreviated string prior to numeric encoding # e.g. 'PRIME_UNTARGETED': 'PU' keypairs = self.create_keypair_dict(col, **kwargs) forced_key = keypairs[forced_zero] keylist = sorted(list(keypairs.values())) if keylist[0] != forced_zero: del keypairs[forced_zero] keypairs_zero = {forced_zero: forced_key} keypairs_zero.update(keypairs) keylist = sorted(list(keypairs_zero.values())) keypairs = keypairs_zero keys = list(dict(enumerate(keylist)).values()) vals = list(dict(enumerate(keylist)).keys()) coded_keys = dict(zip(keys, vals)) return coded_keys, keypairs def abbreviator(self, x, keypairs): return keypairs.get(x, x) def string_encoder(self, x, coded): return coded.get(x, x) def split_caster(self, x, splitter=".", i=0): # split string on splitter, return index i of split string # e.g. "13.0" becomes "13" return x.split(splitter)[i] def string_caster(self, x): # convert to string if not isinstance(x, str): return str(x) else: return x def recast_data( self, col, stringify=True, splitify=True, make_upper=True, **kwargs ): if stringify is True: self.df[col] = self.df[col].apply(lambda x: self.string_caster(x)) if splitify is True: self.df[col] = self.df[col].apply(lambda x: self.split_caster(x, **kwargs)) if make_upper is True: self.df[col] = self.df[col].apply(lambda x: x.upper())