Source code for aphin.identification.aphin

import numpy as np
from abc import ABC
import logging
from sklearn.decomposition import TruncatedSVD
import tensorflow as tf
import matplotlib.pyplot as plt

# own modules
from . import PHBasemodel
from aphin.layers import PHLayer
from aphin.utils import integrators

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)


[docs] class APHIN(PHBasemodel, ABC): """ Autoencoder-based port-Hamiltonian Identification Network (ApHIN) """ def __init__( self, reduced_order, pca_order=None, x=None, u=None, mu=None, system_layer=None, layer_sizes=None, activation="selu", pca_scaling=False, use_pca=True, pca_only=False, l_rec: float = 1, l_dz: float = 1, l_dx: float = 1, l1=0, l2=0, dtype="float32", **kwargs, ): """ Model to discover low-dimensional dynamics of a high-dimensional system using autoencoders and a layer for identification of other dynamical systems (see SystemLayer), e.g., a PHLayer (port-Hamiltonian) Parameters ---------- reduced_order : int Order of the reduced model. pca_order : int, optional Order of the PCA model. x : array-like, optional Input data (full states) with shape (n_sim*n_t, n_f) with n_sim simulation scenarios, n_t time steps, n_f features. u : array-like, optional System inputs with shape (n_sim*n_t, n_u) with n_sim simulation scenarios, n_t time steps, n_u inputs. mu : array-like, optional Parameter data (n_sim*n_t, n_mu). system_layer : SystemLayer or subclass instance, optional Instance that learns the reduced system in the latent space. layer_sizes : list of int, optional Layers of the dense neural network of the non-linear autoencoder part. activation : str, optional Activation function of the dense neural network of the non-linear autoencoder part. pca_scaling : bool, optional If PCA modes should be scaled according to singular values. use_pca : bool, optional If PCA (linear MOR) should be performed before/after the non-linear autoencoder part. pca_only : bool, optional If only PCA (linear MOR) should be performed instead of the non-linear autoencoder part. l_rec : float, optional Weight of the reconstruction loss. l_dz : float, optional Weight of the derivative loss in the latent space. l_dx : float, optional Weight of the derivative loss in the physical space. l1 : float, optional L1 regularization factor. l2 : float, optional L2 regularization factor. dtype : str, optional Data type for the model. kwargs : dict Additional keyword arguments. """ # tf.keras.backend.set_floatx(dtype) self.dtype_ = dtype super(APHIN, self).__init__(**kwargs) # general parameters self.system_optimizer = None if layer_sizes is None: self.layer_sizes = [10, 10, 10] else: self.layer_sizes = layer_sizes self.activation = tf.keras.activations.get(activation) # pca related parameters if pca_order is None: pca_order = 10 * reduced_order self.reduced_order = reduced_order self.pca_order = pca_order self.pca_scaling_individual = pca_scaling self.scale_factor = None self.use_pca = use_pca self.pca_only = pca_only if self.pca_only: self.use_pca = True self.pca_order = self.reduced_order self.layer_sizes = [] if system_layer is None: self.system_layer = PHLayer(self.reduced_order) else: self.system_layer = system_layer # weighting of the different losses self.l_rec, self.l_dz, self.l_dx = l_rec, l_dz, l_dx self.regularizer = tf.keras.regularizers.l1_l2(l1=l1, l2=l2) # create the model x = tf.cast(x, dtype=self.dtype_) self.x_shape = x.shape[1:] if u is not None: u = tf.cast(u, dtype=self.dtype_) self.u_shape = u.shape[1:] if mu is not None: mu = tf.cast(mu, dtype=self.dtype_) self.mu_shape = mu.shape[1:] if not use_pca: self.pca_order = x.shape[1] # some subclasses initialize weights before building the model if hasattr(self, "init_weights"): self.init_weights() self.build_model(x, u, mu) # create loss tracker self.loss_tracker = tf.keras.metrics.Mean(name="loss") self.rec_loss_tracker = tf.keras.metrics.Mean(name="rec_loss") self.dz_loss_tracker = tf.keras.metrics.Mean(name="dz_loss") self.dx_loss_tracker = tf.keras.metrics.Mean(name="dx_loss") self.reg_loss_tracker = tf.keras.metrics.Mean(name="reg_loss") # decide which loss needs to be evaluated # only perform reconstruction if no identification loss is used if self.l_dx == 0.0 and self.l_dz == 0.0: self.get_loss = self._get_loss_rec # calculate loss for first order systems else: self.get_loss = self._get_loss # save parsed arguments if not hasattr(self, "config"): self._init_to_config(locals())
[docs] def get_trainable_weights(self): """ Returns the trainable weights of the model. Returns ------- list List of trainable weights. """ return ( self.encoder.trainable_weights + self.decoder.trainable_weights + self.system_network.trainable_weights )
[docs] def build_model(self, x, u, mu): """ Build the model. Parameters ---------- x : array-like Full state with shape (n_samples, n_features). u : array-like, optional Inputs with shape (n_samples, n_inputs). mu : array-like, optional Parameters with shape (n_samples, n_params). """ x_input, z_pca, z, z_dec, x = self.build_autoencoder(x) # System inputs if u is not None: u_input = tf.keras.Input(shape=(u.shape[1],)) else: u_input = tf.keras.Input(shape=(0,)) # Simulation parameters if mu is not None: mu_input = tf.keras.Input(shape=(mu.shape[1],)) else: mu_input = tf.keras.Input(shape=(0,)) dz_dt_system = self.system_layer(z, u_input, mu_input) # linear autoencoder part from full to intermediate latent space self.pca_encoder = tf.keras.Model( inputs=x_input, outputs=z_pca, name="pca_encoder" ) self.pca_decoder = tf.keras.Model(inputs=z_dec, outputs=x, name="pca_decoder") # nonlinear autoencoder part from intermediate latent space to latent space self.nonlinear_encoder = tf.keras.Model( inputs=z_pca, outputs=z, name="nonlinear_encoder" ) self.nonlinear_decoder = tf.keras.Model( inputs=z, outputs=z_dec, name="nonlinear_decoder" ) # global autoencoder self.encoder = tf.keras.Model(inputs=x_input, outputs=z, name="encoder") self.decoder = tf.keras.Model(inputs=z, outputs=x, name="decoder") # network for discovery of dynamic system (e.g. pH system) self.system_network = tf.keras.Model( inputs=[z, u_input, mu_input], outputs=dz_dt_system, name="system" )
[docs] def build_autoencoder(self, x): """ Build the encoder and decoder of the autoencoder. Parameters ---------- x : array-like Input data. Returns ------- tuple Tuple containing inputs and outputs of the autoencoder. """ x_input = tf.keras.Input(shape=(x.shape[1],)) # first part of the encoder may consist of a linear projection based on PCA z_pca = self.build_pca_encoder(x, x_input) # second part of the encoder and first part of the decoder is a nonlinear part z, z_dec = self.build_nonlinear_autoencoder(z_pca) # second part of the decoder is a back projection based on PCA x = self.build_pca_decoder(z_dec) return x_input, z_pca, z, z_dec, x
[docs] def build_pca_encoder(self, x, x_input): """ Calculate the PCA of the data and build a linear encoder which is equivalent to the PCA. Parameters ---------- x : array-like Input data. x_input : tf.Tensor Input tensor. Returns ------- tf.Tensor Encoded PCA tensor. """ # if PCA is used, calculate the PCA and build a linear encoder if self.use_pca: # calculate the PCA pca = TruncatedSVD(n_components=self.pca_order) x = pca.fit_transform(x) # use the projection matrix as linear encoder self.down = tf.cast(pca.components_, dtype=self.dtype_) self.up = tf.cast(pca.components_.T, dtype=self.dtype_) self.singular_values = pca.singular_values_ z_pca = x_input @ tf.transpose(self.down) # in case no PCA is used, the encoder is just the identity else: z_pca = x_input * 1 # if individual scaling is used, scale every feature individually if self.pca_scaling_individual: if self.use_pca: # scale by singular values to maintain the right variance self.scale_factor = 1 / tf.sqrt( tf.cast(pca.singular_values_, dtype=self.dtype_) ) else: # scale every feature by its maximum value to avoid numerical issues self.scale_factor = 1 / tf.sqrt(tf.reduce_max(tf.abs(x), axis=0)) # if no individual scaling is used, scale the whole data set by its maximum value else: self.scale_factor = 1 / tf.reduce_max(tf.abs(x)) # apply scaling z_pca = z_pca * self.scale_factor return z_pca
[docs] def build_pca_decoder(self, z_dec): """ Build a linear decoder which is equivalent to the backprojection of the PCA. Parameters ---------- z_dec : tf.Tensor Decoded PCA tensor. Returns ------- tf.Tensor Decoded tensor. """ # apply scaling z_dec = z_dec / self.scale_factor # pca part if self.use_pca: x = z_dec @ tf.transpose(self.up) # in case no PCA is used, the decoder is just the identity else: x = z_dec * 1 return x
[docs] def build_nonlinear_autoencoder(self, z_pca): """ Build a fully connected autoencoder with layers of size layer_sizes. Parameters ---------- z_pca : tf.Tensor Input to the autoencoder. Returns ------- tuple Tuple containing encoded and decoded tensors. """ if self.pca_only: # Use only PCA and no nonlinear autoencoder z = tf.keras.layers.Lambda(lambda x: x * 1.0)(z_pca) z_dec = tf.keras.layers.Lambda(lambda x: x * 1.0)(z) return z, z_dec z = z_pca for n_neurons in self.layer_sizes: z = tf.keras.layers.Dense( n_neurons, activation=self.activation, activity_regularizer=self.regularizer, )(z) z = tf.keras.layers.Dense(self.reduced_order, activation="linear")(z) # new decoder x_ = z for n_neurons in reversed(self.layer_sizes): x_ = tf.keras.layers.Dense(n_neurons, activation=self.activation)(x_) z_dec = tf.keras.layers.Dense(self.pca_order, activation="linear")(x_) return z, z_dec
[docs] @tf.function def train_step(self, inputs): """ Perform one training step. Parameters ---------- inputs : array-like Input data. Returns ------- dict Dictionary containing loss values. """ # perform forward pass, calculate loss and update weights rec_loss, dz_loss, dx_loss, reg_loss, loss = self.build_loss(inputs) # update loss tracker self.loss_tracker.update_state(loss) self.rec_loss_tracker.update_state(rec_loss) self.dz_loss_tracker.update_state(dz_loss) self.dx_loss_tracker.update_state(dx_loss) self.reg_loss_tracker.update_state(reg_loss) return { "loss": self.loss_tracker.result(), "rec_loss": self.rec_loss_tracker.result(), "dz_loss": self.dz_loss_tracker.result(), "dx_loss": self.dx_loss_tracker.result(), "reg_loss": self.reg_loss_tracker.result(), }
[docs] @tf.function def test_step(self, inputs): """ Perform one test step. Parameters ---------- inputs : array-like Input data. Returns ------- dict Dictionary containing loss values. """ rec_loss, dz_loss, dx_loss, reg_loss, loss = self.build_loss(inputs) self.loss_tracker.update_state(loss) self.rec_loss_tracker.update_state(rec_loss) self.dz_loss_tracker.update_state(dz_loss) self.dx_loss_tracker.update_state(dx_loss) self.reg_loss_tracker.update_state(reg_loss) return { "loss": loss, "rec_loss": rec_loss, "dz_loss": dz_loss, "dx_loss": dx_loss, "reg_loss": reg_loss, }
[docs] def calc_latent_time_derivatives(self, x, dx_dt): """ Calculate time derivatives of latent variables given the time derivatives of the input variables. Parameters ---------- x : array-like Full state with shape (n_samples, n_features). dx_dt : array-like Time derivative of state with shape (n_samples, n_features). Returns ------- tuple Tuple containing latent variables and their time derivatives. """ x = tf.cast(x, self.dtype_) dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t12: xr = self.pca_encoder(x) t12.watch(xr) z = self.nonlinear_encoder(xr) dz_dxr = t12.batch_jacobian(z, xr) # calculate first time derivative of the latent variable by application of the chain rule # dz_ddt = dz_dx @ dx_dt # = dz_dxr @ (V^T @ dx_dt) try: dz_dt = dz_dxr @ tf.expand_dims(self.pca_encoder(dx_dt), axis=-1) # for convolutional autoencoder data has another shape and must be vectorized first except tf.errors.InvalidArgumentError: dz_dxr, dx_dt = self.reshape_conv_data( dz_dxr, tf.expand_dims(self.pca_encoder(dx_dt), axis=-1) ) dz_dt = dz_dxr @ dx_dt dz_dt = tf.squeeze(dz_dt, axis=2) return z.numpy(), dz_dt.numpy()
[docs] def calc_pca_time_derivatives(self, x, dx_dt): """ Calculate time derivatives of PCA variables given the time derivatives of the input variables. Parameters ---------- x : array-like Full state with shape (n_samples, n_features). dx_dt : array-like Time derivative of state with shape (n_samples, n_features). Returns ------- tuple Tuple containing PCA coordinates and their time derivatives. """ x = tf.cast(x, self.dtype_) dx_dt = tf.expand_dims(tf.cast(dx_dt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t12: z_pca = self.pca_encoder(x) t12.watch(z_pca) # calculate first time derivative of the latent variable by application of the chain rule # dz_ddt = dz_dx @ dx_dt # = dz_dxr @ (V^T @ dx_dt) dz_pca_dt = tf.expand_dims(self.pca_encoder(dx_dt), axis=-1) dz_pca_dt = tf.squeeze(dz_pca_dt, axis=-1) return z_pca.numpy(), dz_pca_dt.numpy()
[docs] def calc_physical_time_derivatives(self, z, dz_dt): """ Calculate time derivatives of physical variables given the time derivatives of the latent variables. Parameters ---------- z : array-like Latent state with shape (n_samples, r). dz_dt : array-like Time derivative of latent state with shape (n_samples, r). Returns ------- tuple Tuple containing physical variables and their time derivatives. """ z = tf.cast(z, self.dtype_) dz_dt = tf.expand_dims(tf.cast(dz_dt, dtype=self.dtype_), axis=-1) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t12: t12.watch(z) xr = self.nonlinear_decoder(z) x = self.pca_decoder(xr) dxr_dz = t12.batch_jacobian(xr, z) # calculate first time derivative of the physical variable by application of the chain rule # dx_ddt = dx_dz @ dz_dt # = V dxr_dz @ dz_dt dx_dt = self.pca_decoder(dxr_dz @ dz_dt) return x.numpy(), dx_dt.numpy()
@tf.function def _get_loss_rec(self, x, dx_dt, u, mu): """ Calculate reconstruction loss of autoencoder. Parameters ---------- x : array-like Full state with shape (n_samples, n_features). dx_dt : array-like Time derivative of state with shape (n_samples, n_features). u : array-like System inputs with shape (n_samples, n_inputs). mu : array-like System parameters with shape (n_samples, n_params). Returns ------- tuple Tuple containing individual losses and total loss. """ xr = self.pca_encoder(x) z = self.nonlinear_encoder(xr) xr_ = self.nonlinear_decoder(z) # we only calculate the reconstruction loss for the reconstruction of the pca encoded input # because this is the only trainable part and the computational cost is reduced rec_loss = self.l_rec * self.compute_loss(_, xr, xr_) # for conformity with other get_loss functions return 0 for other losses dz_loss = 0 dx_loss = 0 # get model losses (e.g. regularization) reg_loss = tf.reduce_mean(self.losses) if self.losses else 0 total_loss = rec_loss + dz_loss + dx_loss + reg_loss return rec_loss, dz_loss, dx_loss, reg_loss, total_loss @tf.function def _get_loss(self, x, dx_dt, u, mu): """ Calculate loss. Parameters ---------- x : array-like Full state with shape (n_samples, n_features). dx_dt : array-like Time derivative of state with shape (n_samples, n_features). u : array-like System inputs with shape (n_samples, n_inputs). mu : array-like System parameters with shape (n_samples, n_params). Returns ------- tuple Tuple containing individual losses and total loss. """ # time derivative of intermediate latent space dxr_dt = tf.expand_dims( self.pca_encoder(tf.cast(dx_dt, dtype=self.dtype_)), axis=-1 ) # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t12: # linear projection of input to intermediate latent space xr = self.pca_encoder(x) t12.watch(xr) # nonlinear mapping of intermediate latent space to latent space z = self.nonlinear_encoder(xr) dz_dxr = t12.batch_jacobian(z, xr) # the second part of the loss calculation is similar for all subclasses and consequently outsourced rec_loss, dz_loss, dx_loss = self.get_loss_second_part( xr, dz_dxr, dxr_dt, z, u, mu ) # get model losses (e.g. regularization) reg_loss = tf.reduce_mean(self.losses) if self.losses else 0 if self.system_network.losses: reg_loss += tf.math.add_n(self.system_network.losses) # calculate total loss as weighted sum of individual losses total_loss = rec_loss + dz_loss + dx_loss + reg_loss return rec_loss, dz_loss, dx_loss, reg_loss, total_loss
[docs] def get_loss_second_part(self, xr, dz_dxr, dxr_dt, z, u, mu): """ Second part of loss calculation (loss calclulation is split into two parts as the second part differs for the different autoencoder implementations, while the first part remains the same). Parameters ---------- xr : tf.Tensor Intermediate latent space tensor. dz_dxr : tf.Tensor Jacobian of latent variables with respect to intermediate latent space. dxr_dt : tf.Tensor Time derivative of intermediate latent space. z : tf.Tensor Latent variables. u : tf.Tensor System inputs. mu : tf.Tensor System parameters. Returns ------- tuple Tuple containing individual losses. """ # calculate first time derivative of the latent variable by application of the chain rule # dz_ddt = dz_dx @ dx_dt # = dz_dxr @ (V^T @ dx_dt) dz_dt = dz_dxr @ dxr_dt # calculate left hand side of ODE system (relevant for descriptor systems) dxr_dt_lhs = self.system_layer.lhs(dxr_dt) dz_dt_lhs = self.system_layer.lhs(dz_dt) # system_network approximation of the time derivative of the latent variable system_pred = self.system_network([z, u, mu]) dz_dt_system = tf.expand_dims(system_pred, -1) # forward pass of decoder and time derivative of reconstructed variable with tf.GradientTape() as t22: t22.watch(z) xr_ = self.nonlinear_decoder(z) # we only calculate this if the loss is not zero to save computational cost if self.l_dx > 0.0: dxr_dz = t22.batch_jacobian(xr_, z) # reshape in case of convolutional autoencoder dxr_dz = self.reshape_dxr_dz(dxr_dz) # calculate first time derivative of the reconstructed state by application of the chain rule dxf_dt = dxr_dz @ dz_dt_system dx_loss = self.l_dx * self.compute_loss(z, dxf_dt, dxr_dt_lhs) else: dx_loss = 0.0 # calculate losses rec_loss = self.l_rec * self.compute_loss(z, xr, xr_) dz_loss = ( self.l_dz * self.compute_loss(z, dz_dt_lhs, dz_dt_system) / tf.reduce_mean(tf.abs(dz_dt_lhs)) ) return rec_loss, dz_loss, dx_loss
[docs] def reshape_dxr_dz(self, dxr_dz): """ Reshape data for conformity with Convolutional Autoencoder. Parameters ---------- dxr_dz : tf.Tensor Jacobian of reconstructed state with respect to latent variables. Returns ---------- dxr_dz: tf.Tensor Same as input """ return dxr_dz
[docs] def vis_modes(self, x, mode_ids=3, latent_ids=None, block=True): """ Visualize the reconstruction of the reduced coefficients of the PCA modes. Parameters ---------- x : array-like Original dataset. mode_ids : int or array-like, optional Scalar (plots mode_ids) or array (plots modes with indices from mode_ids). latent_ids : int or array-like, optional Scalar (plots latent_ids) or array (plots modes with indices from latent_ids). block : bool, optional Whether to block the display of the plot. Returns ------- None """ modes = self.pca_encoder(x) if isinstance( mode_ids, (list, tuple, np.ndarray) ): # check if n_modes is an array n_modes = len(mode_ids) else: # n_modes is scalar: plot modes from 1:n_modes n_modes = min(mode_ids, modes.shape[1]) mode_ids = list(range(n_modes)) z = self.nonlinear_encoder(modes) modes_rec = self.nonlinear_decoder(z) # define number of latent coordinate plots (default: plot all latent coordinates) if isinstance( latent_ids, (list, tuple, np.ndarray) ): # check if n_latent is an array n_latent_plots = len(latent_ids) elif latent_ids is None: # show all latent coordinates n_latent_plots = self.reduced_order latent_ids = list(range(n_latent_plots)) else: # n_latent is scalar: plot latent coordinates from 1:n_latent n_latent_plots = min(latent_ids, self.reduced_order) latent_ids = list(range(n_latent_plots)) # visualize modes in subplots n_plots = n_modes + n_latent_plots if self.use_pca else n_latent_plots fig, axs = plt.subplots(n_plots, 1, figsize=(10, 10), sharex=True) # plot latent variables for i in range(n_latent_plots): latent_id = latent_ids[i] axs[i].plot(z[:, latent_id], color="k") axs[i].set_title(f"z_{i}") # plot PCA modes (original and reconstructed by latent variables) if self.use_pca: for i in range(n_modes): mode_id = mode_ids[i] axs[i + n_latent_plots].plot(modes[:, mode_id], label="Original") axs[i + n_latent_plots].plot( modes_rec[:, mode_id], "--", label="Reconstructed" ) axs[i + n_latent_plots].set_title( rf"Mode {mode_id}, $\sigma = {self.singular_values[mode_id]:.4g}$" ) # add legend axs[i + n_latent_plots].legend() fig.tight_layout() plt.show(block=block)
# @tf.function
[docs] def encode(self, x): """ Encode full state. Parameters ---------- x : array-like Full state with shape (n_samples, n_features, n_dof_per_feature). Returns ------- z : array-like Latent variable with shape (n_samples, reduced_order). """ z = self.encoder(x) return z
# @tf.function
[docs] def decode(self, z): """ Decode latent variable. Parameters ---------- z : array-like Latent variable with shape (n_samples, reduced_order). Returns ------- x : array-like Full state with shape (n_samples, n_features, n_dof_per_feature). """ x_rec = self.decoder(z) return x_rec
# @tf.function
[docs] def reconstruct(self, x, _=None): """ Reconstruct full state. Parameters ---------- x : array-like Full state with shape (n_samples, n_features, n_dof_per_feature). Returns ------- x_rec : array-like Reconstructed full state with shape (n_samples, n_features, n_dof_per_feature). """ z = self.encode(x) x_rec = self.decode(z) return x_rec
# %% time integrator for latent variables (pH structure-preserving integrator)
[docs] def implicit_midpoint( self, t0, z0, t_bound, step_size, B=None, u=None, decomp_option=1 ): """ Calculate time integration of linear ODE through implicit midpoint rule ODE system E*dz_dt = A*z + B*u Theory: We got a pH-system E*Dx = (J-D)*Q*x + B*u we define A:=(J-D)*Q and the RHS as f(t,x) use the differential slope equation at midpoint (x(t+h)-x(t))/h=Dx(t+h/2)=E^-1 * f(t+h/2,x(t+h/2)) since x(t+h/2) is unknown we use the approximation x(t+h/2) = 1/2*(x(t)+x(t+h)) insert the linear system into the differential equation leads to x(t+h) = x(t) + h * E^-1 *(1/2*A*(x(t)+x(t+h))+ B*u(t+h/2)) reformulate the equation to (E-h/2*A)x(t+h) = (E+h/2*A)*x(t) + h*B*u(t+h/2) solve the linear equation system, e.g. via LU-decomposition Parameters ---------- t0 : float Initial time. z0 : array-like Initial state vector. t_bound : float End time. step_size : float Constant step width. B : array-like, optional Input matrix, default is None (will be set to zero). u : callable, optional Input function at time midpoints, default is None (will be set to zero). decomp_option : int, optional Option for decomposition (1-lu_solve), default is 1. Returns ------- z : array-like Integrated state vector. ----------------------------------------------------------------------- """ # todo: define self variables that are used integrators.implicit_midpoint( self.A, self.E, t0, z0, t_bound, step_size, B, u, decomp_option )
[docs] def get_projection_properties(self, x=None, x_test=None, file_dir=None): """ Compute and save the projection and Jacobian error. Parameters ---------- x : array-like, optional Training data. x_test : array-like, optional Test data. file_dir : str, optional Directory to save the projection properties. Returns ------- tuple Tuple containing projection and Jacobian errors for training and test data. """ projection_error, jacobian_error, projection_error_test, jacobian_error_test = [ None ] * 4 if x is not None: projection_error, jacobian_error = self.projection_properties(x) if x_test is not None: projection_error_test, jacobian_error_test = self.projection_properties( x_test ) with open(file_dir, "w") as text_file: if x is not None: print(f"TRAIN projection error: {projection_error}", file=text_file) print(f"TRAIN jacobian error: {jacobian_error}", file=text_file) if x_test is not None: print(f"TEST projection error: {projection_error_test}", file=text_file) print(f"TEST jacobian error: {jacobian_error_test}", file=text_file) return ( projection_error, jacobian_error, projection_error_test, jacobian_error_test, )
[docs] def projection_properties(self, x): """ Compute the projection and Jacobian error. Parameters ---------- x : array-like Input data. Returns ------- tuple Tuple containing projection error and Jacobian error. """ z = self.encode(x) # we only need to evaluate the nonlinear autoencoder since the linear one is a projection per definition # forward pass of encoder and time derivative of latent variable with tf.GradientTape() as t1: t1.watch(z) # linear projection of input to intermediate latent space v_rec = self.nonlinear_decoder(z) jac_z = t1.batch_jacobian(v_rec, z) with tf.GradientTape() as t2: t2.watch(v_rec) # nonlinear mapping of intermediate latent space to latent space z_rec = self.nonlinear_encoder(v_rec) jac_x = t2.batch_jacobian(z_rec, v_rec) # calculate to which extent the autoencoder meets the projection properties # 1. enc(dec(z)) == z projection_error = tf.reduce_mean( tf.square(tf.norm(z - z_rec, axis=1, ord=2)) / tf.square(tf.norm(z, axis=1, ord=2)) ) # 2. jacobian_z(dec(z)) @ jacobian_dec(z)(z) == I jacobian_error = tf.reduce_mean( tf.square( tf.norm( tf.eye(self.reduced_order) - (jac_x @ jac_z), axis=(1, 2), ord=2 ) ) ) return projection_error, jacobian_error