import numpy as np
import tensorflow as tf
from ncem.models.layers import (Decoder, Encoder, get_out, PreprocInput, SamplingPrior)
[docs]class ModelCVAE:
"""Model class for conditional variational autoencoder."""
def __init__(
self,
input_shapes,
latent_dim: int = 10,
intermediate_dim_enc: int = 128,
intermediate_dim_dec: int = 128,
depth_enc: int = 1,
depth_dec: int = 1,
dropout_rate: float = 0.1,
l2_coef: float = 0.0,
l1_coef: float = 0.0,
use_domain: bool = False,
use_type_cond: bool = True,
use_batch_norm: bool = False,
scale_node_size: bool = False,
transform_input: bool = False,
output_layer="gaussian",
**kwargs
):
"""Initialize conditional variational autoencoder model.
Parameters
----------
input_shapes
input_shapes.
latent_dim : int
Latent dimension.
dropout_rate : float
Dropout rate.
l2_coef : float
l2 regularization coefficient.
l1_coef : float
l1 regularization coefficient.
intermediate_dim_enc : int
Encoder intermediate dimension.
depth_enc : int
Encoder depth.
intermediate_dim_dec : int
Decoder intermediate dimension.
depth_dec : int
Decoder depth.
use_domain : bool
Whether to use domain information.
use_type_cond : bool
Whether to use type conditional.
use_batch_norm : bool
Whether to use batch normalization.
scale_node_size : bool
Whether to scale output layer by node sizes.
transform_input : bool
Whether to transform input.
output_layer : str
Output layer.
kwargs
Arbitrary keyword arguments.
Raises
------
ValueError
If `output_layer` is not recognized.
"""
super().__init__()
self.args = {
"input_shapes": input_shapes,
"latent_dim": latent_dim,
"intermediate_dim_enc": intermediate_dim_enc,
"intermediate_dim_dec": intermediate_dim_dec,
"depth_enc": depth_enc,
"depth_dec": depth_dec,
"dropout_rate": dropout_rate,
"l2_coef": l2_coef,
"l1_coef": l1_coef,
"use_domain": use_domain,
"use_type_cond": use_type_cond,
"scale_node_size": scale_node_size,
"output_layer": output_layer,
}
out_node_feature_dim = input_shapes[1]
in_node_dim = input_shapes[3]
categ_condition_dim = input_shapes[4]
domain_dim = input_shapes[5]
# node_features - H: Input Tensor - shape=(None, N, F)
input_x = tf.keras.Input(shape=(in_node_dim, out_node_feature_dim), name="node_features")
# node size - reconstruction: Input Tensor - shape=(None, N, 1)
input_node_size = tf.keras.Input(shape=(in_node_dim, 1), name="node_size_reconstruct")
# Categorical predictors: Input Tensor - shape=(None, N, P)
input_categ_condition = tf.keras.Input(shape=(in_node_dim, categ_condition_dim), name="categorical_predictor")
# domain information of graph - shape=(None, 1)
input_g = tf.keras.layers.Input(shape=(domain_dim,), name="input_da_group", dtype="int32")
if use_domain:
categ_condition = tf.concat(
[
input_categ_condition,
tf.tile(tf.expand_dims(tf.cast(input_g, dtype="float32"), axis=-2), [1, in_node_dim, 1]),
],
axis=-1,
)
else:
categ_condition = input_categ_condition
# Decoder inputs:
# 1) Sample in mode:
latent_sampling1 = SamplingPrior(width=latent_dim)(input_x)
latent_sampling_reshaped1 = tf.reshape(latent_sampling1, [-1, latent_dim])
# 2) Sample in data intput:
input_latent_sampling2 = tf.keras.Input(shape=(in_node_dim, latent_dim), name="z_sampling")
latent_sampling_reshaped2 = tf.reshape(input_latent_sampling2, [-1, latent_dim])
if transform_input:
x = PreprocInput()(input_x)
else:
x = input_x
self.encoder_model = Encoder(
latent_dim=latent_dim,
intermediate_dim=intermediate_dim_enc,
dropout_rate=dropout_rate,
n_hidden=depth_enc,
l1_coef=l1_coef,
l2_coef=l2_coef,
use_type_cond=use_type_cond,
use_batch_norm=use_batch_norm,
probabilistic=True,
)
output_encoder = self.encoder_model((x, categ_condition))
z, z_mean, z_log_var = output_encoder
latent_space = tf.keras.layers.Concatenate(axis=1, name="bottleneck")([z, z_mean, z_log_var])
latent_space_sampling = tf.zeros_like(latent_space, name="bottleneck")
latent_space2 = tf.keras.layers.Concatenate(axis=1, name="bottleneck")(
[ # immitate latent_space tensor
tf.zeros_like(input_latent_sampling2),
tf.zeros_like(input_latent_sampling2),
tf.zeros_like(input_latent_sampling2),
]
)
self.decoder_model = Decoder(
intermediate_dim=intermediate_dim_dec,
dropout_rate=dropout_rate,
n_hidden=depth_dec,
l1_coef=l1_coef,
l2_coef=l2_coef,
use_type_cond=use_type_cond,
use_batch_norm=use_batch_norm,
)
output_decoder = self.decoder_model((z, categ_condition))
sampling_decoder1 = self.decoder_model((latent_sampling_reshaped1, categ_condition))
sampling_decoder2 = self.decoder_model((latent_sampling_reshaped2, categ_condition))
output_decoder_layer = get_out(
output_layer=output_layer, out_feature_dim=out_node_feature_dim, scale_node_size=scale_node_size
)((output_decoder, input_node_size))
output_sampling_decoder1 = get_out(
output_layer=output_layer, out_feature_dim=out_node_feature_dim, scale_node_size=scale_node_size,
name='sampling1'
)((sampling_decoder1, input_node_size))
output_sampling_decoder2 = get_out(
output_layer=output_layer, out_feature_dim=out_node_feature_dim, scale_node_size=scale_node_size,
name='sampling2'
)((sampling_decoder2, input_node_size))
output_decoder_concat = tf.keras.layers.Concatenate(axis=2, name="reconstruction")(output_decoder_layer)
output_sampling_concat1 = tf.keras.layers.Concatenate(axis=2, name="reconstruction")(output_sampling_decoder1)
output_sampling_concat2 = tf.keras.layers.Concatenate(axis=2, name="reconstruction")(output_sampling_decoder2)
self.encoder = tf.keras.Model(
inputs=[input_x, input_categ_condition, input_g], outputs=output_encoder, name="encoder"
)
self.decoder_sampling = tf.keras.Model(
inputs=[input_x, input_node_size, input_categ_condition, input_g],
outputs=[output_sampling_concat1, latent_space_sampling],
name="decoder_sampling",
)
self.decoder = tf.keras.Model(
inputs=[input_latent_sampling2, input_node_size, input_categ_condition, input_g],
outputs=[output_sampling_concat2, latent_space2],
name="decoder",
)
self.training_model = tf.keras.Model(
inputs=[input_x, input_node_size, input_categ_condition, input_g],
outputs=[output_decoder_concat, latent_space],
name="cvae",
)
# Add non-scaled ELBO to model as metric (ie no annealing or beta-VAE scaling):
log2pi = tf.math.log(2.0 * np.pi)
logqz_x = -0.5 * tf.reduce_mean(tf.square(z - z_mean) * tf.exp(-z_log_var) + z_log_var + log2pi)
logpz = -0.5 * tf.reduce_mean(tf.square(z) + log2pi)
d_kl = logqz_x - logpz
loc, scale = output_decoder_layer
if output_layer == "gaussian" or output_layer == "gaussian_const_disp":
neg_ll = tf.math.log(tf.sqrt(2 * np.math.pi) * scale) + 0.5 * tf.math.square(
loc - input_x
) / tf.math.square(scale)
elif output_layer == "nb" or output_layer == "nb_const_disp" or output_layer == "nb_shared_disp":
eta_loc = tf.math.log(loc)
eta_scale = tf.math.log(scale)
log_r_plus_mu = tf.math.log(scale + loc)
ll = tf.math.lgamma(scale + input_x)
ll = ll - tf.math.lgamma(input_x + tf.ones_like(input_x))
ll = ll - tf.math.lgamma(scale)
ll = ll + tf.multiply(input_x, eta_loc - log_r_plus_mu) + tf.multiply(scale, eta_scale - log_r_plus_mu)
neg_ll = -tf.clip_by_value(ll, -300, 300, "log_probs")
else:
neg_ll = None
neg_ll = tf.reduce_mean(tf.reduce_sum(neg_ll, axis=-1))
self.training_model.add_metric(neg_ll + d_kl, name="elbo", aggregation="mean")