Source code for aphin.layers.lti_layer

"""
TensorFlow layers for identification of dynamical systems
e.g. LTI systems (LTILayer), port-Hamiltonian systems (PHLayer), ...
intended for standalone use or the use in the latent space of an autoencoder.
All layers approximate the right-hand side of the ODE given states z(t), inputs u(t) and parameters mu
and should be trained using reference values of the states, inputs, parameters and the left-hand side of the ODE
which is assumed to depend only on the time derivative of the states.
"""

import tensorflow as tf
from tensorflow.python.ops.linalg.linear_operator_full_matrix import (
    LinearOperatorFullMatrix,
)
from aphin.layers import SystemLayer
from aphin.operators import LinearOperatorSkewSym, LinearOperatorSym

import numpy as np
import random
import logging


[docs] class LTILayer(SystemLayer): """ Layer for LTI approximation of the time derivative of the latent variable z'(t) = A * z(t) + B * u(t) with A = J - R, J skew. sym., R sym. The system matrix is decomposed into skew-symmetric J and symmetric parts R for conformity with pH layers """ def __init__( self, r, n_u=None, n_mu=None, regularizer=None, layer_sizes=[], activation="elu", **kwargs, ): """ Initialize the LTILayer. Parameters ---------- r : int Number of latent variables. n_u : int, optional Number of inputs, by default None. n_mu : int, optional Number of parameters, by default None. regularizer : tf.keras.regularizers.Regularizer, optional Regularizer object for trainable variables of the class, by default None. layer_sizes : list, optional Sizes of the layers for parameter dependent weights, by default []. activation : str, optional Activation function to use, by default "elu". **kwargs : dict Additional arguments for the SystemLayer base class. """ super(LTILayer, self).__init__(r, n_u, n_mu, regularizer, **kwargs) self.n_skew = int((r - 1) * r / 2) # Number of skew symmetric DOF self.n_sym = int((r + 1) * r / 2) # Number of symmetric DOF # Layer sizes for parameter dependent weights self.layer_sizes = layer_sizes self.activation = activation if self.n_mu > 0: self.init_parameter_dependent_weights() else: self.init_weights()
[docs] def init_weights(self): """ Initialize trainable variables """ # add trainable variables self.dof_J = self.add_weight( name="dof_J", shape=(self.n_skew,), initializer="uniform", trainable=True, regularizer=self.regularizer, dtype=self.dtype_, ) self.dof_R = self.add_weight( name="dof_R", shape=(self.n_sym,), initializer="uniform", trainable=True, regularizer=self.regularizer, dtype=self.dtype_, ) if self.n_u > 0: self.dof_B = self.add_weight( name="dof_B", shape=(self.r, self.n_u), initializer="uniform", trainable=True, regularizer=self.regularizer, dtype=self.dtype_, )
@property def n_matrices_dofs(self): """ Number of trainable variables for the matrices. Returns ------- int Number of trainable variables. """ return self.n_skew + self.n_sym + self.r * self.n_u @property def dof_split(self): """ Split of the internal degrees of freedom into J, R, B. Returns ------- tuple Tuple containing the split of degrees of freedom. """ return self.n_skew, self.n_sym, self.r * self.n_u, 0
[docs] def init_parameter_dependent_weights(self): """ Initialize trainable variables that depend on parameters (mu). """ input = tf.keras.Input(shape=(self.n_mu,)) pred = input for i, layer_size in enumerate(self.layer_sizes): pred = tf.keras.layers.Dense( layer_size, activation=self.activation, # kernel_regularizer=self.regularizer, # bias_regularizer=self.regularizer, # activity_regularizer=self.regularizer )(pred) # output layer to weights pred = tf.keras.layers.Dense( self.n_matrices_dofs, activation="linear", kernel_regularizer=self.regularizer, bias_regularizer=self.regularizer, # activity_regularizer=self.regularizer )(pred) self.parameter_dependent_weight_model = tf.keras.Model( inputs=input, outputs=pred )
[docs] def get_parameter_dependent_weights(self, mu): """ In case of parameter dependent weights, get the weights for the given parameters. Parameters ---------- mu : array-like Parameters. Returns ------- tuple Tuple containing dof_J, dof_R, dof_B, and dof_Q. """ # call fully connected layers dofs = self.parameter_dependent_weight_model(mu) # split into J, R, B (and Q just implemented for PHQ Layer) dof_J, dof_R, dof_B, dof_Q = tf.split(dofs, self.dof_split, axis=1) # split matrices into correct shape dof_J = tf.reshape( dof_J, ( -1, self.n_skew, ), ) dof_R = tf.reshape( dof_R, ( -1, self.n_sym, ), ) dof_B = tf.reshape(dof_B, (-1, self.r, self.n_u)) if dof_Q.shape[1] != 0: dof_Q = tf.reshape(dof_Q, (-1, self.n_sym)) return dof_J, dof_R, dof_B, dof_Q
[docs] def get_system_matrices(self, mu=None, n_t=None): """ Return matrices in the format (n_sim, r, r). Parameters ---------- mu : array-like, optional Parameters, by default None. n_t : int, optional Number of time steps, required if mu is not None, by default None. Returns ------- tuple Tuple containing matrices J, R, and B. """ if mu is not None: self.dof_J, self.dof_R, self.dof_B, _ = ( self.get_parameter_dependent_weights(mu) ) # convert to matrices if n_t is None: raise ValueError("n_t is required in the parameter-dependent case.") J = np.reshape(self.J.to_dense().numpy(), (-1, n_t, self.r, self.r)) R = np.reshape(self.R.to_dense().numpy(), (-1, n_t, self.r, self.r)) B = np.reshape(self.B.to_dense().numpy(), (-1, n_t, self.r, self.n_u)) # since all matrices are the same over n_t, remove n_t and transform to shape (n_sim,r,r) J = J[:, 0, :, :] R = R[:, 0, :, :] B = B[:, 0, :, :] else: # convert to matrices J = np.expand_dims(self.J.to_dense().numpy(), axis=0) R = np.expand_dims(self.R.to_dense().numpy(), axis=0) if self.n_u > 0: B = np.expand_dims(self.B.to_dense().numpy(), axis=0) else: B = None return J, R, B
# @tf.function
[docs] def call(self, z, u=None, mu=None, training=False): """ Evaluate right-hand side of the ODE system z'(t) = f(z, u, mu) for inputs (z, u, mu). Parameters ---------- z : array-like System states with shape (n_t * n_s, n). u : array-like, optional System inputs with shape (n_t * n_s, n_u), by default None. mu : array-like, optional Parameters, by default None. training : bool, optional Whether the call is in training mode, by default False. Returns ------- tf.Tensor Approximation of z'(t). """ # update matrices for given parameters if self.n_mu > 0: self.dof_J, self.dof_R, self.dof_B, _ = ( self.get_parameter_dependent_weights(mu) ) # evaluate PH approximation of the time derivative of the latent variable if self.n_u == 0: dz_dt = self.J.matvec(z) - self.R.matvec(z) else: dz_dt = self.J.matvec(z) - self.R.matvec(z) + self.B.matvec(u) return dz_dt
@property def J(self): """ Get the skew-symmetric matrix J. Returns ------- tf.Tensor Skew-symmetric matrix J. """ if not hasattr(self, "dof_J"): return tf.zeros([self.r, self.r]) if self.n_skew == 0: return LinearOperatorSym(tf.zeros([self.r, self.r])) return LinearOperatorSkewSym(self.dof_J) @property def R(self): """ Get the symmetric matrix R. Returns ------- tf.Tensor Symmetric matrix R. """ if not hasattr(self, "dof_J"): return tf.zeros([self.r, self.r]) if self.n_sym == 0: return LinearOperatorSym(tf.zeros([self.r, self.r])) # if R is only symmetric instead of sym. pos. def., an arbitrary LTI system can be learned return LinearOperatorSym(self.dof_R) @property def B(self): """ Get the (full) input matrix B. Returns ------- tf.Tensor Input matrix B. """ if self.n_u == 0: return tf.zeros([self.r, self.n_u]) return LinearOperatorFullMatrix(self.dof_B)