Source code for spacekit.builder.architect

import os
import sys
import importlib.resources
from zipfile import ZipFile
import numpy as np
import time
import datetime as dt
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras import optimizers, callbacks
from tensorflow.keras.layers import (
    Dense,
    Input,
    concatenate,
    Conv1D,
    MaxPool1D,
    Dropout,
    Flatten,
    BatchNormalization,
    Conv3D,
    MaxPool3D,
    GlobalAveragePooling3D,
)
from tensorflow.keras.metrics import RootMeanSquaredError as RMSE
from spacekit.generator.augment import augment_data, augment_image
from spacekit.analyzer.track import stopwatch
from spacekit.builder.blueprints import Blueprint
from spacekit.logger.log import Logger

TF_CPP_MIN_LOG_LEVEL = 2


[docs]class Builder: """Class for building and training a neural network.""" def __init__( self, train_data=None, test_data=None, blueprint=None, algorithm=None, model_path=None, name=None, logname="Builder", **log_kws, ): if train_data is not None: self.X_train = train_data[0] self.y_train = train_data[1] if test_data is not None: self.X_test = test_data[0] self.y_test = test_data[1] self.blueprint = blueprint self.algorithm = algorithm self.model_path = model_path self.name = name self.batch_size = 32 self.epochs = 60 self.lr = 1e-4 self.decay = [100000, 0.96] self.early_stopping = None self.callbacks = None self.verbose = 2 self.ensemble = False self.model = None self.mlp = None self.cnn = None self.ann = None self.step_size = None self.steps_per_epoch = None self.batch_maker = None self.history = None self.tx_file = None self.__name__ = logname self.log = Logger(self.__name__, **log_kws).spacekit_logger()
[docs] def load_saved_model( self, arch=None, compile_params=None, custom_obj={}, extract_to="models", keras_archive=True, ): """Load saved keras model from local disk (located at the ``model_path`` attribute) or a pre-trained model from spacekit.skopes.trained_networks (if ``model_path`` attribute is None). Example for ``compile_params``: ``dict(loss="binary_crossentropy",metrics=["accuracy"],\ optimizer=Adam(learning_rate=optimizers.schedules.ExponentialDecay(1e-4, \ decay_steps=100000, decay_rate=0.96, staircase=True)))`` Parameters ---------- arch : str, optional select a pre-trained model from the spacekit library ("svm_align", "jwst_cal", or "hst_cal"), by default None compile_params : dict, optional Compile the model using kwarg parameters, by default None custom_obj : dict, optional custom objects keyword arguments to be passed into Keras `load_model`, by default {} extract_to : str or path, optional directory location to extract into, by default "models" keras_archive : bool, optional for models saved in the newer high-level .keras compressed archive format (recommended), by default True Returns ------- Keras functional Model object pre-trained (and/or compiled) functional Keras model. """ if self.model_path is None: self.load_pretrained_network(arch=arch) if str(self.model_path).split(".")[-1] == "zip": self.unzip_model_files(extract_to=extract_to) model_basename = os.path.basename(self.model_path.rstrip("/")) if model_basename != self.name: # for spacekit archives, this is always True for root, dirs, files in os.walk(self.model_path): if keras_archive is True: for f in files: if f == f"{self.name}.keras": self.model_path = os.path.join(root, f) break else: # for legacy SavedModel folder containing assets, variables and saved_model.pb for d in dirs: if d == self.name: self.model_path = os.path.join(root, d) self.log.debug(f"Loading saved model from {str(self.model_path)}") try: self.model = load_model(self.model_path, custom_objects=custom_obj) if compile_params: self.model = Model(inputs=self.model.inputs, outputs=self.model.outputs) self.model.compile(**compile_params) except Exception as e: self.log.error(e) return self.model
[docs] def load_pretrained_network(self, arch=None): mission_blueprints = ["hst", "jwst"] err = None if arch is None: err = "Must specify spacekit pretrained NN using `arch` when model_path attribute is not set." elif arch not in ["hst_cal", "jwst_cal", "svm_align"]: err = f"The spacekit pretrained NN archive named {arch} does not exist." if err: self.log.error(err) sys.exit(1) model_src = "spacekit.builder.trained_networks" archive_file = f"{arch}.zip" # hst_cal.zip | jwst_cal.zip | svm_align.zip with importlib.resources.path(model_src, archive_file) as mod: self.model_path = mod if self.blueprint is None: mission_arch = arch.split("_")[0] if mission_arch in mission_blueprints: self.blueprint = f"{mission_arch}_{self.name}" elif mission_arch == "svm": self.blueprint = "ensemble"
[docs] def unzip_model_files(self, extract_to="models"): """Extracts a keras model object from a zip archive Parameters ---------- extract_to : str, optional directory location to extract into, by default "models" Returns ------- string path to where the model archive has been extracted """ os.makedirs(extract_to, exist_ok=True) model_base = os.path.basename(self.model_path).split(".")[0] with ZipFile(self.model_path, "r") as zip_ref: zip_ref.extractall(extract_to) self.model_path = os.path.join(extract_to, model_base)
[docs] def find_tx_file(self, name="tx_data.json"): if not self.model_path: return for root, _, files in os.walk(os.path.dirname(self.model_path)): for f in files: if f == name: self.tx_file = os.path.join(root, f) break if not os.path.exists(self.tx_file): self.tx_file = os.path.join(self.model_path, name)
[docs] def set_build_params( self, input_shape=None, output_shape=None, layers=None, kernel_size=None, activation=None, cost_function=None, strides=None, optimizer=None, lr_sched=None, loss=None, metrics=None, input_name=None, output_name=None, name=None, algorithm=None, ): """Set custom build parameters for a Builder object. Parameters ---------- input_shape : tuple, optional shape of the inputs, by default None output_shape : tuple, optional shape of the output, by default None layers: list, optional sizes of hidden (dense) layers, by default None kernel_size : int, optional size of the kernel, by default None activation : string, optional dense layer activation, by default None cost_function: str, optional function to update weights (calculate cost), by default None strides : int, optional number of strides, by default None optimizer : object, optional type of optimizer to use, by default None lr_sched: bool, optional use a learning_rate schedule such as ExponentialDecay, by default None loss : string, optional loss metric to monitor, by default None metrics : list, optional metrics for model to train on, by default None algorithm : str, optional analysis type, used for determining spacekit.analyzer.Compute class e.g. "linreg" for linear regression or "multiclass" for multi-label classification, by default None Returns ------- self spacekit.builder.architect.Builder class object with updated attributes """ self.input_shape = input_shape self.output_shape = output_shape self.layers = layers self.kernel_size = kernel_size self.activation = activation self.cost_function = cost_function self.strides = strides self.optimizer = optimizer self.lr_sched = lr_sched self.loss = loss self.metrics = metrics self.algorithm = algorithm self.input_name = input_name self.output_name = output_name self.name = name return self
[docs] def fit_params( self, batch_size=32, epochs=60, lr=1e-4, decay=[100000, 0.96], early_stopping=None, verbose=2, ensemble=False, ): """Set custom model fitting parameters as Builder object attributes. Parameters ---------- batch_size : int, optional size of each training batch, by default 32 epochs : int, optional number of epochs, by default 60 lr : float, optional initial learning rate, by default 1e-4 decay : list, optional decay_steps, decay_rate, by default [100000, 0.96] early_stopping : str, optional use an early stopping callback, by default None verbose : int, optional set the verbosity level, by default 2 ensemble : bool, optional ensemble type network, by default False Returns ------- self spacekit.builder.architect.Builder class object with updated fitting parameters. """ self.batch_size = batch_size self.epochs = epochs self.lr = lr self.decay = decay self.early_stopping = early_stopping self.verbose = verbose self.ensemble = ensemble return self
[docs] def get_blueprint(self, architecture, fitting=True): draft = Blueprint(architecture=architecture) self.set_build_params(**draft.building()) if fitting is True: self.fit_params(**draft.fitting()) return self
[docs] def decay_learning_rate(self): """Set learning schedule with exponential decay Returns ------- keras.optimizers.schedules.ExponentialDecay exponential decay learning rate schedule """ lr_schedule = optimizers.schedules.ExponentialDecay( self.lr, decay_steps=self.decay[0], decay_rate=self.decay[1], staircase=True ) return lr_schedule
[docs] def set_callbacks(self, patience=15): """Set an early stopping callback by monitoring the model training for either accuracy or loss. For classifiers, use 'val_accuracy' or 'val_loss'. For regression use 'val_loss' or 'val_rmse'. Returns ------- list [callbacks.ModelCheckpoint, callbacks.EarlyStopping] """ model_name = str(self.model.name) checkpoint_cb = callbacks.ModelCheckpoint( f"{model_name}_checkpoint.h5", save_best_only=True ) early_stopping_cb = callbacks.EarlyStopping( monitor=self.early_stopping, patience=patience ) self.callbacks = [checkpoint_cb, early_stopping_cb] return self.callbacks
[docs] def save_keras_model(self, model_path): dpath = os.path.dirname(model_path) os.makedirs(dpath, exist_ok=True) name = os.path.basename(model_path) if not name.endswith("keras"): name += ".keras" keras_model_path = os.path.join(dpath, name) self.model.save(keras_model_path) self.model_path = keras_model_path
[docs] def save_model(self, weights=True, output_path=".", keras_archive=True, parent_dir=""): """The model architecture, and training configuration (including the optimizer, losses, and metrics) are stored in saved_model.pb. The weights are saved in the variables/ directory. Parameters ---------- weights : bool, optional save weights learned by the model separately also, by default True output_path : str, optional where to save the model files, by default "." keras_archive : bool, optional save model using new (preferred) keras archive format, by default True """ if self.name is None: self.name = str(self.model.name) datestamp = dt.datetime.now().isoformat().split("T")[0] model_name = f"{self.name}_{datestamp}" else: model_name = self.name model_path = os.path.join(output_path, "models", parent_dir, model_name) if keras_archive is True: self.save_keras_model(model_path) else: self.model.save(model_path) if weights is True: weights_path = f"{model_path}/weights/ckpt" self.model.save_weights(weights_path) for root, _, files in os.walk(model_path): indent = " " * root.count(os.sep) print("{}{}/".format(indent, os.path.basename(root))) for filename in files: print("{}{}".format(indent + " ", filename)) self.model_path = model_path
[docs] def model_diagram( self, model=None, output_path=None, show_shapes=True, show_dtype=False, LR=False, expand_nested=True, show_layer_names=False, ): rank = "LR" if LR is True else "TB" if model is None: model = self.model if output_path is None: output_path = os.getcwd() try: from tensorflow.keras.utils import plot_model # req: pydot, graphviz plot_model( model, to_file=f"{output_path}/{model.name}.png", show_shapes=show_shapes, show_dtype=show_dtype, show_layer_names=show_layer_names, rankdir=rank, expand_nested=expand_nested, dpi=96, layer_range=None, ) except ImportError: self.log.error( "pydot and graphviz not installed: `pip install spacekit[viz]`" )
# TODO # def timer(self, func, model_name): # def wrap(): # start = time.time() # stopwatch(f"TRAINING ***{model_name}***", t0=start) # func() # end = time.time() # stopwatch(f"TRAINING ***{model_name}***", t0=start, t1=end) # return func # return wrap # @timer
[docs] def batch_fit(self): """ Fits cnn using a batch generator of equal positive and negative number of samples, rotating randomly. Returns ------- tf.keras.model.history Keras training history """ model_name = str(self.model.name).upper() self.log.info("FITTING MODEL...") validation_data = ( (self.X_test, self.y_test) if self.X_test is not None else None ) if self.early_stopping is not None: self.callbacks = self.set_callbacks() start = time.time() stopwatch(f"TRAINING ***{model_name}***", t0=start) if self.steps_per_epoch is None or 0: self.steps_per_epoch = 1 self.history = self.model.fit( self.batch_maker(), validation_data=validation_data, verbose=self.verbose, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch, callbacks=self.callbacks, ) end = time.time() stopwatch(f"TRAINING ***{model_name}***", t0=start, t1=end) self.model.summary() return self.history
[docs] def fit(self, params=None): """Fit a model to the training data. Parameters ---------- params : dictionary, optional set custom fit params, by default None Returns ------- tf.keras.model.history Keras training history object """ if params is not None: self.fit_params(**params) model_name = str(self.model.name).upper() self.log.info("FITTING MODEL...") validation_data = ( (self.X_test, self.y_test) if self.X_test is not None else None ) if self.early_stopping is not None: self.callbacks = self.set_callbacks() start = time.time() stopwatch(f"TRAINING ***{model_name}***", t0=start) self.history = self.model.fit( self.X_train, self.y_train, batch_size=self.batch_size, validation_data=validation_data, verbose=self.verbose, epochs=self.epochs, callbacks=self.callbacks, ) end = time.time() stopwatch(f"TRAINING ***{model_name}***", t0=start, t1=end) self.model.summary() return self.history
[docs]class BuilderMLP(Builder): """Subclass for building and training MLP neural networks Parameters ---------- Builder : class spacekit.builder.architect.Builder class object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="mlp", **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, logname="BuilderMLP", **builder_kwargs, ) self.blueprint = blueprint self.name = "sequential_mlp" self.input_shape = X_train.shape[1] if X_train is not None else None self.output_shape = 1 self.layers = [18, 32, 64, 32, 18] self.input_name = "mlp_inputs" self.output_name = "mlp_output" self.activation = "leaky_relu" self.cost_function = "sigmoid" self.lr_sched = True self.optimizer = Adam self.loss = "binary_crossentropy" self.metrics = ["accuracy"] self.step_size = X_train.shape[0] if X_train is not None else self.batch_size self.steps_per_epoch = self.step_size // self.batch_size self.batch_maker = self.batch
[docs] def build(self): """Build and compile an MLP network Returns ------- tf.keras.model compiled model object """ # visible layer inputs = Input(shape=(self.input_shape,), name=self.input_name) # hidden layers x = Dense( self.layers[0], activation=self.activation, name=f"1_dense{self.layers[0]}" )(inputs) for i, layer in enumerate(self.layers[1:]): i += 1 x = Dense(layer, activation=self.activation, name=f"{i+1}_dense{layer}")(x) # output layer if self.blueprint == "ensemble": self.mlp = Model(inputs, x, name="mlp_ensemble") return self.mlp else: self.model = Sequential() outputs = Dense( self.output_shape, activation=self.cost_function, name=self.output_name )(x) self.model = Model(inputs=inputs, outputs=outputs, name=self.name) if self.lr_sched is True: lr_schedule = self.decay_learning_rate() else: lr_schedule = self.lr self.model.compile( loss=self.loss, optimizer=self.optimizer(learning_rate=lr_schedule), metrics=self.metrics, ) return self.model
[docs] def batch(self): """Randomly rotates through positive and negative samples indefinitely, generating a single batch tuple of (inputs, targets) or (inputs, targets, sample_weights). If the size of the dataset is not divisible by the batch size, the last batch will be smaller than the others. An epoch finishes once `steps_per_epoch` have been seen by the model. Yields ------ tuple a single batch tuple of (inputs, targets) or (inputs, targets, sample_weights). """ # hb: half-batch hb = self.batch_size // 2 # Returns a new array of given shape and type, without initializing. xb = np.empty((self.batch_size, self.X_train.shape[1]), dtype="float32") # y_train.shape = (2016, 1) yb = np.empty((self.batch_size, self.y_train.shape[1]), dtype="float32") pos = np.where(self.y_train[:, 0] == 1.0)[0] neg = np.where(self.y_train[:, 0] == 0.0)[0] # rotating each of the samples randomly while True: np.random.shuffle(pos) np.random.shuffle(neg) xb[:hb] = self.X_train[pos[:hb]] xb[hb:] = self.X_train[neg[hb : self.batch_size]] yb[:hb] = self.y_train[pos[:hb]] yb[hb:] = self.y_train[neg[hb : self.batch_size]] for i in range(self.batch_size): xb[i] = augment_data(xb[i]) yield xb, yb
[docs]class BuilderCNN3D(Builder): """Subclass for building and training 3D convolutional neural networks Parameters ---------- Builder : class spacekit.builder.architect.Builder class object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="cnn3d", **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, logname="BuilderCNN3D", **builder_kwargs, ) self.blueprint = blueprint self.input_shape = X_train.shape[1:] if X_train is not None else None self.output_shape = 1 self.data_format = "channels_last" self.input_name = "cnn3d_inputs" self.output_name = "cnn3d_output" self.activation = "leaky_relu" self.cost_function = "sigmoid" self.loss = "binary_crossentropy" self.optimizer = Adam self.lr_sched = True self.metrics = ["accuracy"] self.name = "cnn3d" self.kernel = 3 self.padding = "same" self.filters = [32, 64, 128, 256] self.pool = 1 self.dense = 512 self.dropout = 0.3 self.step_size = ( self.X_train.shape[0] if self.X_train is not None else self.batch_size ) self.steps_per_epoch = self.step_size // self.batch_size self.batch_maker = self.batch
[docs] def build(self): """Builds a 3D convolutional neural network for RGB image triplets""" inputs = Input(self.input_shape, name=self.input_name) x = Conv3D( filters=32, kernel_size=self.kernel, padding=self.padding, data_format=self.data_format, activation=self.activation, )(inputs) x = MaxPool3D(pool_size=2)(x) x = BatchNormalization()(x) for f in self.filters: x = Conv3D( filters=f, kernel_size=self.kernel, padding=self.padding, activation=self.activation, )(x) x = MaxPool3D(pool_size=self.pool)(x) x = BatchNormalization()(x) x = GlobalAveragePooling3D()(x) x = Dense(units=self.dense, activation=self.activation)(x) x = Dropout(self.dropout)(x) if self.blueprint == "ensemble": self.cnn = Model(inputs, x, name="cnn_ensemble") return self.cnn else: outputs = Dense( units=self.output_shape, activation=self.cost_function, name=self.output_name, )(x) if self.lr_sched is True: lr_schedule = self.decay_learning_rate() else: lr_schedule = self.lr self.model = Model(inputs, outputs, name=self.name) self.model.compile( loss=self.loss, optimizer=self.optimizer(learning_rate=lr_schedule), metrics=self.metrics, ) return self.model
[docs] def batch(self): """ Gives equal number of positive and negative samples rotating randomly The output of the generator must be either - a tuple `(inputs, targets)` - a tuple `(inputs, targets, sample_weights)`. This tuple (a single output of the generator) makes a single batch. The last batch of the epoch is commonly smaller than the others, if the size of the dataset is not divisible by the batch size. The generator loops over its data indefinitely. An epoch finishes when `steps_per_epoch` batches have been seen by the model. """ # hb: half-batch hb = self.batch_size // 2 # Returns a new array of given shape and type, without initializing. xb = np.empty( ( self.batch_size, self.X_train.shape[1], self.X_train.shape[2], self.X_train.shape[3], self.X_train.shape[4], ), dtype="float32", ) yb = np.empty((self.batch_size, self.y_train.shape[1]), dtype="float32") pos = np.where(self.y_train[:, 0] == 1.0)[0] neg = np.where(self.y_train[:, 0] == 0.0)[0] # rotating each of the samples randomly while True: np.random.shuffle(pos) np.random.shuffle(neg) xb[:hb] = self.X_train[pos[:hb]] xb[hb:] = self.X_train[neg[hb : self.batch_size]] yb[:hb] = self.y_train[pos[:hb]] yb[hb:] = self.y_train[neg[hb : self.batch_size]] for i in range(self.batch_size): xb[i] = augment_image(xb[i]) yield xb, yb
[docs]class BuilderEnsemble(Builder): """Subclass for building and training an ensemble model (stacked MLP and 3D CNN) Parameters ---------- Builder : class spacekit.builder.architect.Builder class object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, params=None, input_name="svm_mixed_inputs", output_name="ensemble_output", **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, logname="BuilderEnsemble", **builder_kwargs, ) self.name = "ensembleSVM" self.input_name = input_name self.output_name = output_name self.blueprint = "ensemble" self.ensemble = True self.lr_sched = True self.params = params self.loss = "binary_crossentropy" self.optimizer = Adam self.metrics = ["accuracy"] self.activation = "leaky_relu" self.cost_function = "sigmoid" self.layers = [18, 32, 64, 32, 18] self.output_shape = 1 # fit params: self.batch_size = 32 self.epochs = 1000 self.lr = 1e-4 self.decay = [100000, 0.96] self.early_stopping = False self.verbose = 2 if params is not None: self.fit_params(**params) self.step_size = X_train[0].shape[0] if X_train is not None else self.batch_size self.steps_per_epoch = self.step_size // self.batch_size self.batch_maker = self.batch
[docs] def ensemble_mlp(self): """Compiles the MLP branch of the ensemble model Returns ------- self spacekit.builder.architect.Builder.BuilderEnsemble object with ``mlp`` attribute initialized """ self.mlp = BuilderMLP( X_train=self.X_train[0], y_train=self.y_train, X_test=self.X_test[0], y_test=self.y_test, blueprint="ensemble", ) # experimental (sets all attrs below) # self.mlp.get_blueprint("svm_mlp") self.mlp.input_name = "svm_regression_inputs" self.mlp.output_name = "svm_regression_output" self.mlp.name = "svm_mlp" self.mlp.ensemble = True self.mlp.input_shape = self.X_train[0].shape[1] if self.X_train else None self.mlp.output_shape = 1 self.mlp.layers = [18, 32, 64, 32, 18] self.mlp.activation = "leaky_relu" self.mlp.cost_function = "sigmoid" self.mlp.lr_sched = True self.mlp.optimizer = Adam self.mlp.loss = "binary_crossentropy" self.mlp.metrics = ["accuracy"] self.mlp.model = self.mlp.build() return self.mlp
[docs] def ensemble_cnn(self): """Compiles the CNN branch of the ensemble model Returns ------- self spacekit.builder.architect.Builder.BuilderEnsemble object with ``cnn`` attribute initialized """ self.cnn = BuilderCNN3D( X_train=self.X_train[1], y_train=self.y_train, X_test=self.X_test[1], y_test=self.y_test, blueprint="ensemble", ) # self.cnn.get_blueprint("svm_cnn") self.cnn.input_name = "svm_image_inputs" self.cnn.output_name = "svm_image_output" self.cnn.name = "svm_cnn" self.cnn.ensemble = True self.cnn.input_shape = self.X_train[1].shape[1:] if self.X_train else None self.cnn.output_shape = 1 self.cnn.layers = [18, 32, 64, 32, 18] self.cnn.activation = "leaky_relu" self.cnn.cost_function = "sigmoid" self.cnn.lr_sched = True self.cnn.optimizer = Adam self.cnn.loss = "binary_crossentropy" self.cnn.metrics = ["accuracy"] self.cnn.model = self.cnn.build() return self.cnn
[docs] def build(self): """Builds and compiles the ensemble model Returns ------- self spacekit.builder.architect.Builder.BuilderEnsemble object with ``model`` attribute initialized """ self.mlp = self.ensemble_mlp() self.cnn = self.ensemble_cnn() combinedInput = concatenate([self.mlp.model.output, self.cnn.model.output]) x = Dense(9, activation=self.activation, name=self.input_name)(combinedInput) x = Dense(1, activation=self.cost_function, name=self.output_name)(x) self.model = Model( inputs=[self.mlp.model.input, self.cnn.model.input], outputs=x, name=self.name, ) if self.lr_sched is True: lr_schedule = self.decay_learning_rate() else: lr_schedule = self.lr self.model.compile( loss=self.loss, optimizer=self.optimizer(learning_rate=lr_schedule), metrics=self.metrics, ) return self.model
[docs] def batch(self): """ Gives equal number of positive and negative samples rotating randomly The output of the generator must be either - a tuple `(inputs, targets)` - a tuple `(inputs, targets, sample_weights)`. This tuple (a single output of the generator) makes a single batch. The last batch of the epoch is commonly smaller than the others, if the size of the dataset is not divisible by the batch size. The generator loops over its data indefinitely. An epoch finishes when `steps_per_epoch` batches have been seen by the model. """ # hb: half-batch hb = self.batch_size // 2 # Returns a new array of given shape and type, without initializing. xa = np.empty((self.batch_size, self.X_train[0].shape[1]), dtype="float32") xb = np.empty( ( self.batch_size, self.X_train[1].shape[1], self.X_train[1].shape[2], self.X_train[1].shape[3], self.X_train[1].shape[4], ), dtype="float32", ) yb = np.empty((self.batch_size, self.y_train.shape[1]), dtype="float32") pos = np.where(self.y_train[:, 0] == 1.0)[0] neg = np.where(self.y_train[:, 0] == 0.0)[0] # rotating each of the samples randomly while True: np.random.shuffle(pos) np.random.shuffle(neg) xa[:hb] = self.X_train[0][pos[:hb]] xa[hb:] = self.X_train[0][neg[hb : self.batch_size]] xb[:hb] = self.X_train[1][pos[:hb]] xb[hb:] = self.X_train[1][neg[hb : self.batch_size]] yb[:hb] = self.y_train[pos[:hb]] yb[hb:] = self.y_train[neg[hb : self.batch_size]] for i in range(self.batch_size): xa[i] = augment_data(xa[i]) xb[i] = augment_image(xb[i]) yield (xa, xb), yb
[docs]class BuilderCNN2D(Builder): """Subclass Builder object for 2D Convolutional Neural Networks Parameters ---------- Builder : class spacekit.builder.architect.Builder.Builder object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="cnn2d", **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, logname="BuilderCNN2D", **builder_kwargs, ) self.blueprint = blueprint self.input_shape = self.X_train.shape[1:] if self.X_train else None self.output_shape = 1 self.input_name = "cnn2d_inputs" self.output_name = "cnn2d_output" self.kernel = 11 self.activation = "relu" self.strides = 4 self.filters = [8, 16, 32, 64] self.dense = [64, 64] self.dropout = [0.5, 0.25] self.optimizer = Adam self.lr = 1e-5 self.lr_sched = False self.loss = "binary_crossentropy" self.metrics = ["accuracy"] self.decay = None self.early_stopping = None self.batch_size = 32 self.cost_function = "sigmoid" self.step_size = X_train.shape[1] if X_train else None self.steps_per_epoch = self.step_size // self.batch_size self.batch_maker = self.batch
[docs] def build(self): """ Builds and compiles a 2-dimensional Keras Functional CNN """ self.log.info("BUILDING MODEL...") inputs = Input(self.input_shape, name=self.input_name) print("INPUT LAYER") x = Conv1D( filters=self.filters[0], kernel_size=self.kernel, activation=self.activation, input_shape=self.input_shape, )(inputs) x = MaxPool1D(strides=self.strides)(x) x = BatchNormalization()(x) count = 1 for f in self.filters[1:]: x = Conv1D( filters=self.filters[f], kernel_size=self.kernel, activation=self.activation, )(x) x = MaxPool1D(strides=self.strides)(x) if count < len(self.filters): x = BatchNormalization()(x) count += 1 else: x = Flatten()(x) self.log.info("DROPOUT") for drop, dense in list(zip(self.dropout, self.dense)): x = Dropout(drop)(x) x = Dense(dense, activation=self.activation)(x) if self.blueprint == "ensemble": self.cnn = Model(inputs, x, name="cnn2d_ensemble") return self.cnn else: self.log.info("FULL CONNECTION") outputs = Dense( units=self.output_shape, activation=self.cost_function, name=self.output_name, )(x) if self.lr_sched is True: lr_schedule = self.decay_learning_rate() else: lr_schedule = self.lr self.model = Model(inputs, outputs, name=self.name) self.model.compile( loss=self.loss, optimizer=self.optimizer(learning_rate=lr_schedule), metrics=self.metrics, ) self.log.info("COMPILED") return self.model
[docs] def batch(self): """ Gives equal number of positive and negative samples rotating randomly The output of the generator must be either - a tuple `(inputs, targets)` - a tuple `(inputs, targets, sample_weights)`. This tuple (a single output of the generator) makes a single batch. Therefore, all arrays in this tuple must have the same length (equal to the size of this batch). Different batches may have different sizes. For example, the last batch of the epoch is commonly smaller than the others, if the size of the dataset is not divisible by the batch size. The generator is expected to loop over its data indefinitely. An epoch finishes when `steps_per_epoch` batches have been seen by the model. """ # hb: half-batch hb = self.batch_size // 2 # Returns a new array of given shape and type, without initializing. # x_train.shape = (5087, 3197, 2) xb = np.empty( (self.batch_size, self.X_train.shape[1], self.X_train.shape[2]), dtype="float32", ) # y_train.shape = (5087, 1) yb = np.empty((self.batch_size, self.y_train.shape[1]), dtype="float32") pos = np.where(self.y_train[:, 0] == 1.0)[0] neg = np.where(self.y_train[:, 0] == 0.0)[0] # rotating each of the samples randomly while True: np.random.shuffle(pos) np.random.shuffle(neg) xb[:hb] = self.X_train[pos[:hb]] xb[hb:] = self.X_train[neg[hb : self.batch_size]] yb[:hb] = self.y_train[pos[:hb]] yb[hb:] = self.y_train[neg[hb : self.batch_size]] for i in range(self.batch_size): size = np.random.randint(xb.shape[1]) xb[i] = np.roll(xb[i], size, axis=0) yield xb, yb
""" Pre-fabricated Networks (hyperparameters tuned for specific projects) """
[docs]class MemoryClassifier(BuilderMLP): """Builds an MLP neural network classifier object with pre-tuned params for Calcloud's Memory Bin classifier Args: MultiLayerPerceptron (object): mlp multi-classification builder object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="hst_mem_clf", test_idx=None, **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, blueprint=blueprint, algorithm="multiclass", logname="MemoryClassifier", **builder_kwargs, ) self.input_shape = self.X_train.shape[1] if self.X_train else 9 self.output_shape = 4 self.layers = [18, 32, 64, 32, 18, 9] self.input_name = "hst_jobs" self.output_name = "memory_classifier" self.name = "mem_clf" self.activation = "relu" self.cost_function = "softmax" self.lr_sched = False self.optimizer = Adam self.loss = "categorical_crossentropy" self.metrics = ["accuracy"] self.test_idx = test_idx self.epochs = 60 self.batch_size = 32
[docs]class MemoryRegressor(BuilderMLP): """Builds an MLP neural network regressor object with pre-tuned params for estimating memory allocation (GB) in Calcloud Args: MultiLayerPerceptron (object): mlp linear regression builder object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="hst_mem_reg", test_idx=None, **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, blueprint=blueprint, algorithm="linreg", logname="MemoryRegressor", **builder_kwargs, ) self.input_shape = self.X_train.shape[1] if self.X_train else 9 self.output_shape = 1 self.layers = [18, 32, 64, 32, 18, 9] self.input_name = "hst_jobs" self.output_name = "memory_regressor" self.name = "mem_reg" self.activation = "relu" self.cost_function = "linear" self.lr_sched = False self.optimizer = Adam self.loss = "mse" self.metrics = [RMSE(name="rmse")] self.test_idx = test_idx self.epochs = 60 self.batch_size = 32
[docs]class WallclockRegressor(BuilderMLP): """Builds an MLP neural network regressor object with pre-tuned params for estimating wallclock allocation (minimum execution time in seconds) in Calcloud. Args: MultiLayerPerceptron (object): mlp linear regression builder object """ def __init__( self, X_train=None, y_train=None, X_test=None, y_test=None, blueprint="hst_wall_reg", test_idx=None, **builder_kwargs, ): train_data = ( (X_train, y_train) if X_train is not None and y_train is not None else None ) test_data = ( (X_test, y_test) if X_test is not None and y_test is not None else None ) super().__init__( train_data=train_data, test_data=test_data, blueprint=blueprint, algorithm="linreg", logname="WallclockRegressor", **builder_kwargs, ) self.input_shape = self.X_train.shape[1] if self.X_train else 9 self.output_shape = 1 self.layers = [18, 32, 64, 128, 256, 128, 64, 32, 18, 9] self.input_name = "hst_jobs" self.output_name = "wallclock_regressor" self.name = "wall_reg" self.activation = "relu" self.cost_function = "linear" self.lr_sched = False self.optimizer = Adam self.loss = "mse" self.metrics = [RMSE(name="rmse")] self.test_idx = test_idx self.epochs = 300 self.batch_size = 64