Source code for spacekit.preprocessor.transform
import os
import json
import pandas as pd
import numpy as np
from scipy.ndimage.filters import uniform_filter1d
from sklearn.preprocessing import PowerTransformer
import tensorflow as tf
from astropy.coordinates import SkyCoord
from astropy import units as u
from spacekit.logger.log import Logger
[docs]class SkyTransformer:
# calculate sky separation / reference pixel offset statistics
def __init__(self, mission, name="SkyTransformer", **log_kws):
"""_summary_
Parameters
----------
mission : str
Name of mission or observatory, e.g. "JWST", "HST"
product_exp_headers : dict, optional
, by default None
name : str, optional
logging name, by default "SkyTransformer"
"""
self.__name__ = name
self.log = Logger(self.__name__).spacekit_logger(**log_kws)
self.mission = mission
self.pixel_scales = self.image_pixel_scales()
self.instr = None
self.detector = None
self.channel = None
self.count_exposures = True
self.refpix = dict()
self.set_keys()
[docs] def set_keys(self, **kwargs):
"""
Set keys used in exposure header dictionary to identify values
(typically derived from fits file sciheaders). Possible keyword
arguments include: instr,detector,channel,ra,dec where 'ra','dec'
refer to the fiducial (center pixel coordinate in degrees).
None values will use defaults (see below); unrecognized kwargs
will be ignored.
Defaults:
* instr="INSTRUME"
* detector="DETECTOR"
* channel="CHANNEL"
* band="BAND"
* exp_type="EXP_TYPE"
* ra="CRVAL1" / could also use "RA_REF"
* dec="CRVAL2" / could also use "DEC_REF"
"""
self.instr_key = kwargs.get("instr", "INSTRUME")
self.detector_key = kwargs.get("detector", "DETECTOR")
self.channel_key = kwargs.get("channel", "CHANNEL")
self.band_key = kwargs.get("band", "BAND")
self.exp_key = kwargs.get("exp_type", "EXP_TYPE")
self.ra_key = kwargs.get("ra", "CRVAL1")
self.dec_key = kwargs.get("dec", "CRVAL2")
self.ra_key2 = "RA_REF" if self.ra_key == "CRVAL1" else "CRVAL1"
self.dec_key2 = "DEC_REF" if self.dec_key == "CRVAL2" else "CRVAL2"
[docs] def calculate_offsets(self, product_exp_headers):
"""Given key-value pairs of header info from a set of input exposures,
estimate the fiducial (center pixel coordinates) of the final image product
and calculated pixel offset statistics between inputs and final output using
detector-based footprints and sky separation angles.
NOTE: the product keys and input exposure keys could be any strings and are used
simply for organization. The fits-related key-value pairs nested within each input
exposure dictionary must contain, at minimum, the instrument and fiducial
ra/dec coordinates (e.g. "INSTRUME","CRVAL1","CRVAL1"). The keys themselves
can be custom set using `self.set_keys(**kwargs)` but must match the contents
of the nested dictionary passed into `product_exp_headers`. Typically these are
derived directly from fits file sci headers of the input exposures.
Some missions and instruments require additional information such as "CHANNEL"
(JWST Nircam) or "DETECTOR" (HST) in order to identify the correct pixel scale
and footprint size based on the detector and/or wavelength channel.
Parameters
----------
product_exp_headers : dict
nested dictionary of (typically Level 3) product names (keys),
their input exposures (values) and relevant fits header information
per exposure (key-value pairs).
"""
product_refpix = dict()
for product, exp_headers in product_exp_headers.items():
product_refpix[product] = self.get_pixel_offsets(exp_headers)
return product_refpix
[docs] def validate_fiducial(self, fiducial, exp):
(ra, dec) = fiducial
if isinstance(ra, float) and isinstance(dec, float):
return True
else:
warning_message = f"Invalid RA/DEC fiducial value ({ra}, {dec}) in {str(exp)}"
if exp == "TARG_RA/TARG_DEC":
self.log.debug(warning_message)
else:
self.log.warning(warning_message)
return False
[docs] def get_pixel_offsets(self, exp_data):
if self.count_exposures is True:
refpix = dict(NEXPOSUR=len(list(exp_data.keys())))
else:
refpix = dict()
offsets, targ_offsets, detectors, bands = [], [], [], []
targ_radec = None
bad_fiducials = {}
for exp, data in exp_data.items():
fiducial = (data.get(self.ra_key, self.ra_key2), data.get(self.dec_key, self.dec_key2))
# only need to set once bc consisent across exposures
if targ_radec is None:
targ_radec = (data.get("TARG_RA", ''), data.get("TARG_DEC", ''))
# validate fiducials
if self.validate_fiducial(fiducial, exp) is False:
bad_fiducials[exp] = str(exp)
continue
instr = data[self.instr_key]
detector = data.get(self.detector_key, None)
channel = data.get(self.channel_key, None)
band = data.get(self.band_key, None)
exp_type = data.get(self.exp_key, None)
scale = self.get_scale(
instr, channel=channel, detector=detector, exp_type=exp_type
)
shape = self.data_shapes(instr)
# footprint from shape
footprint = self.footprint_from_shape(fiducial, scale, shape)
exp_data[exp].update(
dict(
fiducial=fiducial,
footprint=footprint,
scale=scale,
)
)
if detector is not None and detector.upper() not in detectors:
detectors.append(detector.upper())
# MIRI MRS: determine bands used: short, long, shortmedium, shortmediumlong
if band is not None:
bands.extend([b.upper() for b in band.split('-') if b.upper() not in bands])
# Throw out any exposures with invalid data
for k in bad_fiducials.keys():
del exp_data[k]
if 'NEXPOSUR' in refpix:
refpix['NEXPOSUR'] -= 1
# if all exposures were bad, return empty dict
if len(exp_data) < 1:
return {}
# find fiducial (final product)
footprints = [v["footprint"] for v in exp_data.values()]
lon_fiducial, lat_fiducial = self.estimate_fiducial(footprints)
refpix["fx_ra"], refpix["fy_dec"] = lon_fiducial, lat_fiducial
# pixel sky sep offsets from estimated fiducial
pcoord = SkyCoord(lon_fiducial, lat_fiducial, unit="deg")
tcoord = None
if self.validate_fiducial(targ_radec, 'TARG_RA/TARG_DEC') is True:
tcoord = SkyCoord(targ_radec[0], targ_radec[1], unit="deg")
for exp, data in exp_data.items():
(ra, dec) = data["fiducial"]
pixel = self.pixel_sky_separation(ra, dec, pcoord, data["scale"])
exp_data[exp]["offset"] = pixel
offsets.append(pixel)
if tcoord:
targ_pixel = self.pixel_sky_separation(ra, dec, tcoord, data["scale"])
exp_data[exp]["targ_offset"] = targ_pixel
targ_offsets.append(targ_pixel)
# fill in metadata for product using reference exposure (usually vals are equal across inputs)
ref_exp = [
k for k, v in exp_data.items() if v["offset"] == np.min(np.asarray(offsets))
][0]
keys = [
k
for k in list(exp_data[ref_exp].keys())
if k not in ["DETECTOR", "BAND", "footprint", "fiducial"]
]
for k in keys:
refpix[k] = exp_data[ref_exp][k]
if len(detectors) > 1:
refpix["DETECTOR"] = "|".join(sorted([d for d in detectors]))
else:
refpix["DETECTOR"] = detectors[0]
if len(bands) > 1:
refpix["BAND"] = "|".join(sorted([b for b in bands], reverse=True))
elif len(bands) == 1:
refpix["BAND"] = bands[0]
else:
refpix["BAND"] = 'NONE'
# offset statistics
offset_stats = self.offset_statistics(offsets)
refpix.update(offset_stats)
if targ_offsets:
targ_offset_stats = self.offset_statistics(targ_offsets, pfx="targ_")
refpix.update(targ_offset_stats)
# experimental
try:
# set default to 0.0 as fallback if calculation fails
refpix["t_offset"] = 0.0
refpix["gs_offset"] = 0.0
refpix["gs_offset"] = self.pixel_sky_separation(
refpix["GS_RA"], refpix["GS_DEC"], pcoord, refpix["scale"]
)
refpix["t_offset"] = self.pixel_sky_separation(
refpix["TARG_RA"], refpix["TARG_DEC"], pcoord, refpix["scale"]
)
except ValueError:
self.log.debug("TARG/GS RA DEC vals missing or NaN - setting to 0.0")
return refpix
[docs] def image_pixel_scales(self):
return dict(
HST=dict(ACS=dict(WFC=0.05), WFC3=dict(UVIS=0.04, IR=0.13)),
JWST=dict(
NIRCAM=dict(
SHORT=0.03,
LONG=0.06,
),
MIRI=dict(
GEN=0.11,
MRS=0.196,
),
NIRISS=0.06,
NIRSPEC=0.12,
FGS=0.069,
),
)[self.mission]
[docs] def data_shapes(self, instr):
return dict(
JWST=dict(
NIRCAM=(2048, 2048),
MIRI=(1032, 1024),
NIRISS=(2048, 2048),
NIRSPEC=(2048, 2048),
),
HST=dict(
ACS=(4096, 2048), # ACS -> WFC,
WFC3=(4096, 2051), # WFC3 -> UVIS (IR=(1024,1024))
),
)[self.mission][instr]
[docs] def get_scale(self, instr, channel=None, detector=None, exp_type=None):
if channel.upper() in ["SHORT", "LONG"]:
return self.pixel_scales[instr][channel]
elif instr.upper() == "MIRI":
if exp_type in ["MIR_MRS"]:
return self.pixel_scales[instr]["MRS"]
else:
return self.pixel_scales[instr]["GEN"]
elif detector.upper() in ["WFC", "UVIS", "IR"]:
return self.pixel_scales[instr][detector]
else:
return self.pixel_scales[instr]
[docs] @staticmethod
def footprint_from_shape(fiducial, scale, shape):
sep_x = (shape[0] / 2 * scale * u.arcsec).to(u.deg).value
sep_y = (shape[1] / 2 * scale * u.arcsec).to(u.deg).value
ra_ref, dec_ref = fiducial
footprint = np.array(
[
[ra_ref - sep_x, dec_ref - sep_y],
[ra_ref + sep_x, dec_ref - sep_y],
[ra_ref + sep_x, dec_ref + sep_y],
[ra_ref - sep_x, dec_ref + sep_y],
]
)
return footprint
[docs] @staticmethod
def estimate_fiducial(footprints: list):
footprints = np.vstack([foot for foot in footprints])
lon, lat = footprints[:, 0], footprints[:, 1]
lon, lat = np.deg2rad(lon), np.deg2rad(lat)
x = np.cos(lat) * np.cos(lon)
y = np.cos(lat) * np.sin(lon)
z = np.sin(lat)
x_mid = (np.max(x) + np.min(x)) / 2.0
y_mid = (np.max(y) + np.min(y)) / 2.0
z_mid = (np.max(z) + np.min(z)) / 2.0
lon_fiducial = np.rad2deg(np.arctan2(y_mid, x_mid)) % 360.0
lat_fiducial = np.rad2deg(np.arctan2(z_mid, np.sqrt(x_mid**2 + y_mid**2)))
return lon_fiducial, lat_fiducial
[docs] @staticmethod
def pixel_sky_separation(ra, dec, p_coords, scale, unit="deg"):
coords = SkyCoord(ra, dec, unit=unit)
skysep_angle = p_coords.separation(coords)
arcsec = skysep_angle.arcsecond
pixel = arcsec / scale
return pixel
[docs] @staticmethod
def offset_statistics(offsets, pfx=""):
offsets = np.asarray(offsets)
stats = dict()
stats[f"{pfx}max_offset"] = np.max(offsets)
stats[f"{pfx}mean_offset"] = np.mean(offsets)
stats[f"{pfx}sigma_offset"] = np.std(offsets)
stats[f"{pfx}err_offset"] = np.std(offsets) / np.sqrt(len(offsets))
sigma1_idx = np.where(offsets > np.mean(offsets) + np.std(offsets))[0]
if len(sigma1_idx) > 0:
stats[f"{pfx}sigma1_mean"] = np.mean(offsets[sigma1_idx])
stats[f"{pfx}frac"] = len(offsets[sigma1_idx]) / len(offsets)
else:
stats[f"{pfx}sigma1_mean"] = 0.0
stats[f"{pfx}frac"] = 0.0
return stats
[docs]class Transformer:
def __init__(
self,
data,
cols=None,
ncols=None,
tx_data=None,
tx_file=None,
save_tx=True,
join_data=1,
rename="_scl",
output_path=None,
name="Transformer",
**log_kws,
):
"""Initializes a Transformer class object. Unless the `cols` attribute is empty, it will automatically instantiate some
of the other attributes needed to transform the data. Using the Transformer subclasses instead is recommended (this
class is mainly used as an object with general methods to load or save the transform data as well as instantiate some of
the initial attributes).
Parameters
----------
data : dataframe or numpy.ndarray
input data containing continuous feature vectors to be transformed (may also contain vectors or columns of
categorical and other datatypes as well).
transformer : class, optional
transform class to use (e.g. from scikit-learn), by default PowerTransformer(standardize=False)
cols : list, optional
column names or array index values of feature vectors to be transformed (i.e. continuous datatype features), by
default []
tx_file : string, optional
path to saved transformer metadata, by default None
save_tx : bool, optional
save the transformer metadata as json file on local disk, by default True
join_data : int, optional
1: join normalized data with remaining columns of original; 2: join with complete original, all columns (requires
renaming)
rename : str or list
if string, will be appended to normalized col names; if list, will rename normalized columns in this order
output_path : string, optional
where to save the transformer metadata, by default None (current working directory)
"""
self.__name__ = name
self.log = Logger(self.__name__, **log_kws).spacekit_logger()
self.data = self.check_shape(data)
self.cols = cols
self.ncols = self.check_columns(ncols=ncols)
self.tx_file = tx_file
self.save_tx = save_tx
self.join_data = join_data
self.rename = rename
self.output_path = output_path
self.tx_data = self.load_transformer_data(tx=tx_data)
self.continuous = self.continuous_data()
self.categorical = self.categorical_data()
[docs] def check_shape(self, data):
if len(data.shape) == 1:
if isinstance(data, np.ndarray):
data = data.reshape(1, -1)
elif isinstance(data, pd.Series):
name = data.name
data = pd.DataFrame(
data.values.reshape(1, -1), columns=list(data.index)
)
data["index"] = name
data.set_index("index", inplace=True)
return data
[docs] def check_columns(self, ncols=None):
if ncols is not None and isinstance(self.data, np.ndarray):
self.cols = ncols
self.ncols = ncols
[docs] def load_transformer_data(self, tx=None):
"""Loads saved transformer metadata from a dictionary or a json file on local disk.
Returns
-------
dictionary
transform metadata used for applying transformations on new data inputs
"""
if tx:
self.tx_data = tx
elif self.tx_file is not None:
with open(self.tx_file, "r") as j:
self.tx_data = json.load(j)
return self.tx_data
else:
return None
[docs] def save_transformer_data(self, tx=None, fname="tx_data.json"):
"""Save the transform metadata to a json file on local disk. Typical use-case is when you need to transform new inputs
prior to generating a prediction but don't have access to the original dataset used to train the model.
Parameters
----------
tx : dictionary
statistical metadata calculated when applying a transform to the training dataset; for PowerTransform this consists
of lambdas, means and standard deviations for each continuous feature vector of the dataset.
Returns
-------
string
path where json file is saved on disk
"""
if self.output_path is None:
self.output_path = os.getcwd()
else:
os.makedirs(self.output_path, exist_ok=True)
self.tx_file = f"{self.output_path}/{fname}"
with open(self.tx_file, "w") as j:
if tx is None:
json.dump(self.tx_data, j)
else:
json.dump(tx, j)
self.log.info(f"TX data saved as json file: {self.tx_file}")
return self.tx_file
[docs] def continuous_data(self):
"""Store continuous feature vectors in a variable using the column names (or axis index if using numpy arrays) from
`cols` attribute.
Returns
-------
dataframe or ndarray
continuous feature vectors (as determined by `cols` attribute)
"""
if self.cols is None:
self.log.debug("`cols` attribute not instantiated.")
return None
if isinstance(self.data, pd.DataFrame):
return self.data[self.cols]
elif isinstance(self.data, np.ndarray):
return self.data[:, self.cols]
[docs] def categorical_data(self):
"""Stores the other feature vectors in a separate variable (any leftover from `data` that are not in `cols`).
Returns
-------
dataframe or ndarray
"categorical" i.e. non-continuous feature vectors (as determined by `cols` attribute)
"""
if self.cols is None:
return None
if isinstance(self.data, pd.DataFrame):
return self.data.drop(self.cols, axis=1, inplace=False)
elif isinstance(self.data, np.ndarray):
allcols = list(range(self.data.shape[1]))
cat_cols = [c for c in allcols if c not in self.cols]
return self.data[:, cat_cols]
[docs] def normalized_dataframe(self, normalized):
"""Creates a new dataframe with the normalized data. Optionally combines with non-continuous vectors (original data) and
appends `_scl` to the original column names for the ones that have been transformed.
Parameters
----------
normalized : dataframe
normalized feature vectors
join_data : bool, optional
merge back with the original non-continuous data, by default True
rename : bool, optional
append '_scl' to normalized column names, by default True
Returns
-------
dataframe
dataframe of same shape as input data with continuous features normalized
"""
try:
idx = self.data.index
except AttributeError:
self.log.error(
"Non-dataframe type detected - Trying `normalized_matrix` instead."
)
return self.normalized_matrix(normalized)
if self.rename is None:
newcols = self.cols
elif isinstance(self.rename, str):
newcols = [c + self.rename for c in self.cols]
elif isinstance(self.rename, list):
newcols = self.rename
try:
data_norm = pd.DataFrame(normalized, index=idx, columns=newcols)
if self.join_data == 1:
data_norm = data_norm.join(self.categorical, how="left")
elif self.join_data == 2:
data_norm = data_norm.join(self.data, how="left")
return data_norm
except Exception as e:
self.log.error(e)
return None
[docs] def normalized_matrix(self, normalized):
"""Concatenates arrays of normalized data with original non-continuous data along the y-axis (axis=1).
Parameters
----------
normalized : numpy.ndarray
normalized data
Returns
-------
numpy.ndarray
array of same shape as input data, with continuous vectors normalized
"""
if isinstance(self.categorical, pd.DataFrame):
cat = self.categorical.values
else:
cat = self.categorical
return np.concatenate((normalized, cat), axis=1)
[docs] def normalizeX(self, normalized):
"""Combines original non-continuous features/vectors with the transformed/normalized data. Determines datatype (array or
dataframe) and calls the appropriate method.
Parameters
----------
normalized : dataframe or ndarray
normalized data
join_data : bool, optional
merge back with non-continuous data, by default True
rename : bool, optional
append '_scl' to normalized column names, by default True
Returns
-------
ndarray or dataframe
array or dataframe of same shape and datatype as inputs, with continuous vectors/features normalized
"""
if isinstance(self.data, pd.DataFrame):
return self.normalized_dataframe(normalized)
elif isinstance(self.data, np.ndarray):
return self.normalized_matrix(normalized)
else:
self.log.error(
"Input data type not recognized - must be a dataframe or array"
)
return None
[docs]class PowerX(Transformer):
"""Applies Leo-Johnson PowerTransform (via scikit learn) normalization and scaling to continuous feature vectors of a
dataframe or numpy array. The `tx_data` attribute can be instantiated from a json file, dictionary or the input data itself.
The training and test sets should be normalized separately (i.e. distinct class objects) to prevent data leakage when
training a machine learning model. Loading the transform metadata from a json file allows you to transform a new input array
(e.g. for predictions) without needing to access the original dataframe.
Parameters
----------
Transformer : class
spacekit.preprocessor.transform.Transformer parent class
Returns
-------
PowerX class object
spacekit.preprocessor.transform.PowerX power transform subclass
"""
def __init__(
self,
data,
cols,
ncols=None,
tx_data=None,
tx_file=None,
save_tx=False,
save_as="tx_data.json",
output_path=None,
join_data=1,
rename="_scl",
**log_kws,
):
super().__init__(
data,
cols=cols,
ncols=ncols,
tx_data=tx_data,
tx_file=tx_file,
save_tx=save_tx,
join_data=join_data,
rename=rename,
output_path=output_path,
name="PowerX",
**log_kws,
)
self.fname = save_as
self.calculate_power()
self.normalized = self.apply_power_matrix()
self.Xt = super().normalizeX(self.normalized)
[docs] def fitX(self):
"""Instantiates a scikit-learn PowerTransformer object and fits to the input data. If `tx_data` was passed as a kwarg or
loaded from `tx_file`, the lambdas attribute for the transformer object will be updated to use these instead of
calculated at the transform step.
Returns
-------
PowerTransformer object
transformer fit to the data
"""
self.transformer = PowerTransformer(standardize=False).fit(self.continuous)
self.transformer.lambdas_ = self.get_lambdas()
return self.transformer
[docs] def get_lambdas(self):
"""Instantiates the lambdas from file or dictionary if passed as kwargs; otherwise it uses the lambdas calculated in the
transformX method. If transformX has not been called yet, returns None.
Returns
-------
ndarray or float
transform of multiple feature vectors returns an array of lambda values; otherwise a single vector returns a single
(float) value.
"""
if self.tx_data is not None:
return self.tx_data["lambdas"]
return self.transformer.lambdas_
[docs] def transformX(self):
"""Applies a scikit-learn PowerTransform on the input data.
Returns
-------
ndarray
continuous feature vectors transformed via scikit-learn PowerTransform
"""
return self.transformer.transform(self.continuous)
[docs] def calculate_power(self):
"""Fits and transforms the continuous feature vectors using scikit learn PowerTransform. Calculates zero mean and unit
variance for each vector as a separate step and stores these along with the lambdas in a dictionary `tx_data` attribute.
This is so that the same normalization can be applied later for prediction inputs without requiring the original training
data - otherwise it would be the same as using PowerTransform(standardize=True). Optionally, the calculated transform
data can be stored in a json file on local disk.
Returns
-------
self
spacekit.preprocessor.transform.PowerX object with transformation metadata calculated for the input data and stored
as attributes.
"""
self.transformer = self.fitX()
self.input_matrix = self.transformX()
if self.tx_data is None:
mu, sig = [], []
for i in range(len(self.cols)):
# normalized[:, i] = (v - m) / s
mu.append(np.mean(self.input_matrix[:, i]))
sig.append(np.std(self.input_matrix[:, i]))
self.tx_data = {
"lambdas": self.get_lambdas(),
"mu": np.asarray(mu),
"sigma": np.asarray(sig),
}
if self.save_tx is True:
tx2 = {}
for k, v in self.tx_data.items():
tx2[k] = list(v)
_ = super().save_transformer_data(tx=tx2, fname=self.fname)
del tx2
return self
[docs] def apply_power_matrix(self):
"""Transforms the input data. This method assumes we already have `tx_data` and a fit-transformed input_matrix (array of
continuous feature vectors), which normally is done automatically when the class object is instantiated and
`calculate_power` is called.
Returns
-------
ndarray
power transformed continuous feature vectors
"""
xrow = self.continuous.shape[0]
xcol = self.continuous.shape[1]
self.normalized = np.empty((xrow, xcol))
for i in range(xcol):
v = self.input_matrix[:, i]
m = self.tx_data["mu"][i]
s = self.tx_data["sigma"][i]
self.normalized[:, i] = np.round((v - m) / s, 5)
return self.normalized
[docs]def normalize_training_data(
df, cols, X_train, X_test, X_val=None, rename=None, output_path=None
):
"""Apply Leo-Johnson PowerTransform (via scikit learn) normalization and scaling to the training data, saving the transform
metadata to json file on local disk and transforming the train, test and val sets separately (to prevent data leakage).
Parameters
----------
df : pandas dataframe
training dataset
cols: list
column names or array index values of feature vectors to be transformed (i.e. continuous datatype features)
X_train : ndarray
training set feature inputs array
X_test : ndarray
test set feature inputs array
X_val : ndarray, optional
validation set inputs array, by default None
Returns
-------
ndarrays
normalized and scaled training, test, and validation sets
"""
print("Applying Normalization (Leo-Johnson PowerTransform)")
ncols = [i for i, c in enumerate(df.columns) if c in cols]
Px = PowerX(
df, cols=cols, ncols=ncols, save_tx=True, rename=rename, output_path=output_path
)
X_train = PowerX(
X_train, cols=cols, ncols=ncols, rename=rename, tx_data=Px.tx_data
).Xt
X_test = PowerX(
X_test, cols=cols, ncols=ncols, rename=rename, tx_data=Px.tx_data
).Xt
if X_val is not None:
X_val = PowerX(
X_val, cols=cols, ncols=ncols, rename=rename, tx_data=Px.tx_data
).Xt
return X_train, X_test, X_val
else:
return X_train, X_test
[docs]def normalize_training_images(X_tr, X_ts, X_vl=None):
"""Scale image inputs so that all pixel values are converted to a decimal between 0 and 1 (divide by 255).
Parameters
----------
X_tr : ndarray
training set images
test : ndarray
test set images
val : ndarray, optional
validation set images, by default None
Returns
-------
ndarrays
image set arrays
"""
X_tr /= 255.0
X_ts /= 255.0
if X_vl is not None:
X_vl /= 255.0
return X_tr, X_ts, X_vl
else:
return X_tr, X_ts
def array_to_tensor(arr, reshape=False, shape=(-1, 1)):
if type(arr) == tf.Tensor:
return arr
if reshape is True:
arr = arr.reshape(shape[0], shape[1])
return tf.convert_to_tensor(arr, dtype=tf.float32)
def y_tensors(y_train, y_test, reshape=True):
y_train = array_to_tensor(y_train, reshape=reshape)
y_test = array_to_tensor(y_test, reshape=reshape)
return y_train, y_test
def X_tensors(X_train, X_test):
X_train = array_to_tensor(X_train)
X_test = array_to_tensor(X_test)
return X_train, X_test
[docs]def arrays_to_tensors(X_train, y_train, X_test, y_test, reshape_y=False):
"""Converts multiple numpy arrays into tensorflow tensor datatypes at once (for convenience).
Parameters
----------
X_train : ndarray
input training features
y_train : ndarray
training target values
X_test : ndarray
input test features
y_test : ndarray
test target values
Returns
-------
tensorflow.tensors
X_train, y_train, X_test, y_test
"""
X_train = array_to_tensor(X_train)
y_train = array_to_tensor(y_train, reshape=reshape_y)
X_test = array_to_tensor(X_test)
y_test = array_to_tensor(y_test, reshape=reshape_y)
return X_train, y_train, X_test, y_test
[docs]def tensor_to_array(tensor, reshape=False, shape=(-1, 1)):
"""Convert a tensor back into a numpy array. Optionally reshape the array (e.g. for target class data).
Parameters
----------
tensor : tensor
tensorflow tensor object
reshape : bool, optional
reshapes the array (-1, 1) using numpy, by default False
Returns
-------
ndarray
array of same shape as input tensor, unless reshape=True
"""
if reshape:
return np.asarray(tensor).reshape(shape[0], shape[1])
else:
return np.asarray(tensor)
[docs]def tensors_to_arrays(X_train, y_train, X_test, y_test):
"""Converts tensors into arrays, which is necessary for certain regression analysis computations. The y_train and y_test args
are reshaped using numpy.reshape(-1, 1).
Parameters
----------
X_train : tensor
training feature inputs
y_train : tensor
training target outputs
X_test : tensor
test feature inputs
y_test : tensor
test target outputs
Returns
-------
numpy.ndarrays
X_train, y_train, X_test, y_test
"""
X_train = tensor_to_array(X_train)
y_train = tensor_to_array(y_train, reshape=True)
X_test = tensor_to_array(X_test)
y_test = tensor_to_array(y_test, reshape=True)
return X_train, y_train, X_test, y_test
[docs]def hypersonic_pliers(
path_to_train, path_to_test, y_col=[0], skip=1, dlm=",", subtract_y=0.0
):
"""Extracts data into 1-dimensional arrays, using separate target classes (y) for training and test data. Assumes y (target)
is first column in dataframe. If the target (y) classes in the raw data are 0 and 2, but you'd like them to be binaries (0
and 1), set subtract_y=1.0
Parameters
----------
path_to_train : string
path to training data file (csv)
path_to_test : string
path to test data file (csv)
y_col : list, optional
axis index of target class, by default [0]
skip : int, optional
skiprows parameter for np.loadtxt, by default 1
dlm : str, optional
delimiter, by default ","
subtract_y : float, optional
subtract this value from all y-values, by default 1.0
Returns
-------
np.ndarrays
X_train, X_test, y_train, y_test
"""
Train = np.loadtxt(path_to_train, skiprows=skip, delimiter=dlm)
cols = list(range(Train.shape[1]))
xcols = [c for c in cols if c not in y_col]
# X_train = Train[:, 1:]
X_train = Train[:, xcols]
# y_train = Train[:, 0, np.newaxis] - subtract_y
y_train = Train[:, y_col, np.newaxis] - subtract_y
Test = np.loadtxt(path_to_test, skiprows=skip, delimiter=dlm)
X_test = Test[:, xcols]
y_test = Test[:, y_col, np.newaxis] - subtract_y
# X_test = Test[:, 1:]
# y_test = Test[:, 0, np.newaxis] - subtract_y
del Train, Test
print("X_train: ", X_train.shape)
print("y_train: ", y_train.shape)
print("X_test: ", X_test.shape)
print("y_test: ", y_test.shape)
return X_train, X_test, y_train, y_test
def thermo_fusion_chisel(matrix1, matrix2=None):
"""Scales each vector of a 2d array (``matrix``) to zero mean and unit variance. The second (optional) matrix is to perform
the same scaling on a separate set of inputs, e.g. train and test data. Note - normalization should be done separately to
prevent data leakage in model training, hence the matrix2 kwarg.
Parameters
----------
matrix1 : ndarray
input feature vectors to be scaled
matrix2 : ndarray, optional
second input feature vectors to be scaled, by default None
Returns
-------
ndarray(s)
scaled array(s) of same shape as input
"""
matrix1 = (matrix1 - np.mean(matrix1, axis=1).reshape(-1, 1)) / np.std(
matrix1, axis=1
).reshape(-1, 1)
print("Mean: ", matrix1[0].mean())
print("Variance: ", matrix1[0].std())
if matrix2 is not None:
matrix2 = (matrix2 - np.mean(matrix2, axis=1).reshape(-1, 1)) / np.std(
matrix2, axis=1
).reshape(-1, 1)
print("Mean: ", matrix2[0].mean())
print("Variance: ", matrix2[0].std())
return matrix1, matrix2
else:
return matrix1
[docs]def babel_fish_dispenser(matrix1, matrix2=None, step_size=None, axis=2):
"""Adds an input corresponding to the running average over a set number of time steps. This helps the neural network to
ignore high frequency noise by passing in a uniform 1-D filter and stacking the arrays.
Parameters
----------
matrix1 : numpy array
e.g. X_train
matrix2 : numpy array, optional
e.g. X_test, by default None
step_size : int, optional
timesteps for 1D filter (e.g. 200), by default None
axis : int, optional
which axis to stack the arrays, by default 2
Returns
-------
numpy array(s)
2D array (original input array with a uniform 1d-filter as noise)
"""
if step_size is None:
step_size = 200
# calc input for flux signal rolling avgs
filter1 = uniform_filter1d(matrix1, axis=1, size=step_size)
# store in array and stack on 2nd axis for each obs of X data
matrix1 = np.stack([matrix1, filter1], axis=axis)
if matrix2 is not None:
filter2 = uniform_filter1d(matrix2, axis=1, size=step_size)
matrix2 = np.stack([matrix2, filter2], axis=axis)
print(matrix1.shape, matrix2.shape)
return matrix1, matrix2
else:
print(matrix1.shape)
return matrix1
[docs]def fast_fourier(matrix, bins):
"""Takes an array (e.g. signal input values) and rotates number of ``bins`` to the left as a fast Fourier transform. Returns
vector of length equal to ``matrix`` input array.
Parameters
----------
matrix : ndarray
input values to transform
bins : int
number of rotations
Returns
-------
ndarray
vector of length equal to ``matrix`` input array
"""
shape = matrix.shape
fourier_matrix = np.zeros(shape, dtype=float)
for row in matrix:
signal = np.asarray(row)
frequency = np.arange(signal.size / 2 + 1, dtype=np.float)
phase = np.exp(
complex(0.0, (2.0 * np.pi)) * frequency * bins / float(signal.size)
)
ft = np.fft.irfft(phase * np.fft.rfft(signal))
fourier_matrix += ft
return fourier_matrix
# for backward compatability with HSTCAL (planned deprecation)
# def update_power_transform(df):
# pt = PowerTransformer(standardize=False)
# df_cont = df[["n_files", "total_mb"]]
# pt.fit(df_cont)
# input_matrix = pt.transform(df_cont)
# # FILES (n_files)
# f_mean = np.mean(input_matrix[:, 0])
# f_sigma = np.std(input_matrix[:, 0])
# # SIZE (total_mb)
# s_mean = np.mean(input_matrix[:, 1])
# s_sigma = np.std(input_matrix[:, 1])
# files = input_matrix[:, 0]
# size = input_matrix[:, 1]
# x_files = (files - f_mean) / f_sigma
# x_size = (size - s_mean) / s_sigma
# normalized = np.stack([x_files, x_size], axis=1)
# idx = df_cont.index
# df_norm = pd.DataFrame(normalized, index=idx, columns=["x_files", "x_size"])
# df["x_files"] = df_norm["x_files"]
# df["x_size"] = df_norm["x_size"]
# lambdas = pt.lambdas_
# pt_transform = {
# "f_lambda": lambdas[0],
# "s_lambda": lambdas[1],
# "f_mean": f_mean,
# "f_sigma": f_sigma,
# "s_mean": s_mean,
# "s_sigma": s_sigma,
# }
# print(pt_transform)
# return df, pt_transform