Assignment
Name: Sidharth P RollNo: AM.EN.U4EAC21063
19ECE354 DEEP LEARNING
Assignment 2
Autoencoders are a type of artificial neural network used to learn efficient codings of input data in an unsupervised manner. They consist of two main parts:
- Encoder: This part compresses the input data into a lower-dimensional representation, called the latent space or bottleneck.
- Decoder: This part reconstructs the input data from the compressed representation.
How Autoencoders Work
- Training: Autoencoders are trained to minimize the difference between the input and the reconstructed output, typically using a loss function like mean squared error.
- Compression: The encoder learns to compress the input data into a compact representation.
- Reconstruction: The decoder learns to reconstruct the original data from this compact representation.
Uses for Generation
Autoencoders can be used for data generation in several ways:
- Denoising: They can remove noise from data by learning to reconstruct clean data from noisy inputs.
- Anomaly Detection: By learning to reconstruct normal data, autoencoders can identify anomalies as inputs that are poorly reconstructed.
- Generative Models: Variational Autoencoders (VAEs) extend autoencoders to generate new data samples by sampling from the latent space.
In generative tasks, the decoder part of the autoencoder can be used to generate new data by feeding it different latent space representations, often sampled from a distribution like a Gaussian. This allows the creation of new, similar data points based on the learned distribution of the training data.
#[tagged]
import os
import pickle
from tensorflow.keras import Model
from tensorflow.keras.layers import Input, Conv2D, ReLU, BatchNormalization, \
Flatten, Dense, Reshape, Conv2DTranspose, Activation
from tensorflow.keras import backend as K
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError
import numpy as np
class Autoencoder:
"""
Autoencoder represents a Deep Convolutional autoencoder architecture with
mirrored encoder and decoder components.
"""
def __init__(self,
input_shape,
conv_filters,
conv_kernels,
conv_strides,
latent_space_dim):
self.input_shape = input_shape # [28, 28, 1]
self.conv_filters = conv_filters # [2, 4, 8]
self.conv_kernels = conv_kernels # [3, 5, 3]
self.conv_strides = conv_strides # [1, 2, 2]
self.latent_space_dim = latent_space_dim # 2
self.encoder = None
self.decoder = None
self.model = None
self._num_conv_layers = len(conv_filters)
self._shape_before_bottleneck = None
self._model_input = None
self._build()
def summary(self):
self.encoder.summary()
self.decoder.summary()
self.model.summary()
def compile(self, learning_rate=0.0001):
optimizer = Adam(learning_rate=learning_rate)
mse_loss = MeanSquaredError()
self.model.compile(optimizer=optimizer, loss=mse_loss)
def train(self, x_train, batch_size, num_epochs):
self.model.fit(x_train,
x_train,
batch_size=batch_size,
epochs=num_epochs,
shuffle=True)
def save(self, save_folder="."):
self._create_folder_if_it_doesnt_exist(save_folder)
self._save_parameters(save_folder)
self._save_weights(save_folder)
def load_weights(self, weights_path):
self.model.load_weights(weights_path)
def reconstruct(self, images):
latent_representations = self.encoder.predict(images)
reconstructed_images = self.decoder.predict(latent_representations)
return reconstructed_images, latent_representations
@classmethod
def load(cls, save_folder="."):
parameters_path = os.path.join(save_folder, "parameters.pkl")
with open(parameters_path, "rb") as f:
parameters = pickle.load(f)
autoencoder = Autoencoder(*parameters)
weights_path = os.path.join(save_folder, "weights.h5")
autoencoder.load_weights(weights_path)
return autoencoder
def _create_folder_if_it_doesnt_exist(self, folder):
if not os.path.exists(folder):
os.makedirs(folder)
def _save_parameters(self, save_folder):
parameters = [
self.input_shape,
self.conv_filters,
self.conv_kernels,
self.conv_strides,
self.latent_space_dim
]
save_path = os.path.join(save_folder, "parameters.pkl")
with open(save_path, "wb") as f:
pickle.dump(parameters, f)
def _save_weights(self, save_folder):
save_path = os.path.join(save_folder, "weights.h5")
self.model.save_weights(save_path)
def _build(self):
self._build_encoder()
self._build_decoder()
self._build_autoencoder()
def _build_autoencoder(self):
model_input = self._model_input
model_output = self.decoder(self.encoder(model_input))
self.model = Model(model_input, model_output, name="autoencoder")
def _build_decoder(self):
decoder_input = self._add_decoder_input()
dense_layer = self._add_dense_layer(decoder_input)
reshape_layer = self._add_reshape_layer(dense_layer)
conv_transpose_layers = self._add_conv_transpose_layers(reshape_layer)
decoder_output = self._add_decoder_output(conv_transpose_layers)
self.decoder = Model(decoder_input, decoder_output, name="decoder")
def _add_decoder_input(self):
return Input(shape=self.latent_space_dim, name="decoder_input")
def _add_dense_layer(self, decoder_input):
num_neurons = np.prod(self._shape_before_bottleneck) # [1, 2, 4] -> 8
dense_layer = Dense(num_neurons, name="decoder_dense")(decoder_input)
return dense_layer
def _add_reshape_layer(self, dense_layer):
return Reshape(self._shape_before_bottleneck)(dense_layer)
def _add_conv_transpose_layers(self, x):
"""Add conv transpose blocks."""
# loop through all the conv layers in reverse order and stop at the
# first layer
for layer_index in reversed(range(1, self._num_conv_layers)):
x = self._add_conv_transpose_layer(layer_index, x)
return x
def _add_conv_transpose_layer(self, layer_index, x):
layer_num = self._num_conv_layers - layer_index
conv_transpose_layer = Conv2DTranspose(
filters=self.conv_filters[layer_index],
kernel_size=self.conv_kernels[layer_index],
strides=self.conv_strides[layer_index],
padding="same",
name=f"decoder_conv_transpose_layer_{layer_num}"
)
x = conv_transpose_layer(x)
x = ReLU(name=f"decoder_relu_{layer_num}")(x)
x = BatchNormalization(name=f"decoder_bn_{layer_num}")(x)
return x
def _add_decoder_output(self, x):
conv_transpose_layer = Conv2DTranspose(
filters=1,
kernel_size=self.conv_kernels[0],
strides=self.conv_strides[0],
padding="same",
name=f"decoder_conv_transpose_layer_{self._num_conv_layers}"
)
x = conv_transpose_layer(x)
output_layer = Activation("sigmoid", name="sigmoid_layer")(x)
return output_layer
def _build_encoder(self):
encoder_input = self._add_encoder_input()
conv_layers = self._add_conv_layers(encoder_input)
bottleneck = self._add_bottleneck(conv_layers)
self._model_input = encoder_input
self.encoder = Model(encoder_input, bottleneck, name="encoder")
def _add_encoder_input(self):
return Input(shape=self.input_shape, name="encoder_input")
def _add_conv_layers(self, encoder_input):
"""Create all convolutional blocks in encoder."""
x = encoder_input
for layer_index in range(self._num_conv_layers):
x = self._add_conv_layer(layer_index, x)
return x
def _add_conv_layer(self, layer_index, x):
"""Add a convolutional block to a graph of layers, consisting of
conv 2d + ReLU + batch normalization.
"""
layer_number = layer_index + 1
conv_layer = Conv2D(
filters=self.conv_filters[layer_index],
kernel_size=self.conv_kernels[layer_index],
strides=self.conv_strides[layer_index],
padding="same",
name=f"encoder_conv_layer_{layer_number}"
)
x = conv_layer(x)
x = ReLU(name=f"encoder_relu_{layer_number}")(x)
x = BatchNormalization(name=f"encoder_bn_{layer_number}")(x)
return x
def _add_bottleneck(self, x):
"""Flatten data and add bottleneck (Dense layer)."""
self._shape_before_bottleneck = K.int_shape(x)[1:]
x = Flatten()(x)
x = Dense(self.latent_space_dim, name="encoder_output")(x)
return x
if __name__ == "__main__":
autoencoder = Autoencoder(
input_shape=(28, 28, 1),
conv_filters=(32, 64, 64, 64),
conv_kernels=(3, 3, 3, 3),
conv_strides=(1, 2, 2, 1),
latent_space_dim=2
)
autoencoder.summary()
from tensorflow.keras.datasets import mnist
LEARNING_RATE = 0.0005
BATCH_SIZE = 32
EPOCHS = 100
def load_mnist():
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train = x_train.astype("float32") / 255
x_train = x_train.reshape(x_train.shape + (1,))
x_test = x_test.astype("float32") / 255
x_test = x_test.reshape(x_test.shape + (1,))
return x_train, y_train, x_test, y_test
def train(x_train, learning_rate, batch_size, epochs):
autoencoder = Autoencoder(
input_shape=(28, 28, 1),
conv_filters=(32, 64, 64, 64),
conv_kernels=(3, 3, 3, 3),
conv_strides=(1, 2, 2, 1),
latent_space_dim=2
)
#autoencoder.summary()
autoencoder.compile(learning_rate)
autoencoder.train(x_train, batch_size, epochs)
return autoencoder
if __name__ == "__main__":
x_train, _, _, _ = load_mnist()
autoencoder = train(x_train[:10000], LEARNING_RATE, BATCH_SIZE, EPOCHS)
autoencoder.save("model")
Info
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nulla et euismod nulla. Curabitur feugiat, tortor non consequat finibus, justo purus auctor massa, nec semper lorem quam in massa.
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error
from skimage.metrics import structural_similarity as ssim
from tensorflow.keras.callbacks import History
def select_images(images, labels, num_images=10):
sample_images_index = np.random.choice(range(len(images)), num_images)
sample_images = images[sample_images_index]
sample_labels = labels[sample_images_index]
return sample_images, sample_labels
def plot_reconstructed_images(images, reconstructed_images):
fig = plt.figure(figsize=(15, 3))
num_images = len(images)
for i, (image, reconstructed_image) in enumerate(zip(images, reconstructed_images)):
image = image.squeeze()
ax = fig.add_subplot(2, num_images, i + 1)
ax.axis("off")
ax.imshow(image, cmap="gray_r")
reconstructed_image = reconstructed_image.squeeze()
ax = fig.add_subplot(2, num_images, i + num_images + 1)
ax.axis("off")
ax.imshow(reconstructed_image, cmap="gray_r")
plt.show()
def plot_images_encoded_in_latent_space(latent_representations, sample_labels):
plt.figure(figsize=(10, 10))
plt.scatter(latent_representations[:, 0],
latent_representations[:, 1],
cmap="rainbow",
c=sample_labels,
alpha=0.5,
s=2)
plt.colorbar()
plt.show()
def calculate_metrics(original_images, reconstructed_images):
"""Calculate various metrics for image reconstruction quality."""
metrics = {}
# Calculate MSE
metrics['mse'] = mean_squared_error(
original_images.reshape(-1),
reconstructed_images.reshape(-1)
)
# Calculate RMSE
metrics['rmse'] = np.sqrt(metrics['mse'])
# Calculate MAE
metrics['mae'] = mean_absolute_error(
original_images.reshape(-1),
reconstructed_images.reshape(-1)
)
# Calculate SSIM
metrics['ssim'] = np.mean([
ssim(orig.squeeze(), recon.squeeze(), data_range=1.0)
for orig, recon in zip(original_images, reconstructed_images)
])
return metrics
def evaluate_model(autoencoder, x_test, batch_size=32):
"""Evaluate model performance on test set."""
reconstructed_images, latent_representations = autoencoder.reconstruct(x_test)
metrics = calculate_metrics(x_test, reconstructed_images)
print("\nModel Performance Metrics:")
print(f"MSE: {metrics['mse']:.6f}")
print(f"RMSE: {metrics['rmse']:.6f}")
print(f"MAE: {metrics['mae']:.6f}")
print(f"SSIM: {metrics['ssim']:.6f}")
return metrics
if __name__ == "__main__":
autoencoder = Autoencoder.load("model")
x_train, y_train, x_test, y_test = load_mnist()
# Evaluate model performance
metrics = evaluate_model(autoencoder, x_test[:1000]) # Using subset for speed
# Visualization code from before
num_sample_images_to_show = 8
sample_images, _ = select_images(x_test, y_test, num_sample_images_to_show)
reconstructed_images, _ = autoencoder.reconstruct(sample_images)
plot_reconstructed_images(sample_images, reconstructed_images)
# Plot latent space
num_images = 6000
sample_images, sample_labels = select_images(x_test, y_test, num_images)
_, latent_representations = autoencoder.reconstruct(sample_images)
plot_images_encoded_in_latent_space(latent_representations, sample_labels)
# Plot reconstruction error distribution
reconstructed_test, _ = autoencoder.reconstruct(x_test[:1000])
pixel_wise_errors = (x_test[:1000] - reconstructed_test) ** 2
plt.figure(figsize=(10, 5))
plt.hist(pixel_wise_errors.ravel(), bins=50)
plt.title('Distribution of Reconstruction Errors')
plt.xlabel('Squared Error')
plt.ylabel('Count')
plt.show()