Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

DCGAN debugging. Getting just garbage

Introduction:

I am trying to get a CDCGAN (Conditional Deep Convolutional Generative Adversarial Network) to work on the MNIST dataset which should be fairly easy considering that the library (PyTorch) I am using has a tutorial on its website.
But I can't seem to get It working it just produces garbage or the model collapses or both.

What I tried:

  • making the model Conditional semi-supervised learning
  • using batch norm
  • using dropout on each layer besides the input/output layer on the generator and discriminator
  • label smoothing to combat overconfidence
  • adding noise to the images (I guess you call this instance noise) to get a better data distribution
  • use leaky relu to avoid vanishing gradients
  • using a replay buffer to combat forgetting of learned stuff and overfitting
  • playing with hyperparameters
  • comparing it to the model from PyTorch tutorial
  • basically what I did besides some things like Embedding layer ect.

Images my Model generated:

Hyperparameters:

batch_size=50, learning_rate_discrimiantor=0.0001, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, droupout=0.5
enter image description hereenter image description hereenter image description hereenter image description here

batch_size=50, learning_rate_discriminator=0.0003, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, dropout=0
enter image description hereenter image description hereenter image description hereenter image description here

Images Pytorch tutorial Model generated:

Code for the pytorch tutorial dcgan model
As comparison here are the images from the DCGAN from the pytorch turoial:
enter image description hereenter image description hereenter image description here

My Code:

import torch import torch.nn as nn import torchvision from torchvision import transforms, datasets import torch.nn.functional as F from torch import optim as optim from torch.utils.tensorboard import SummaryWriter  import numpy as np  import os import time   class Discriminator(torch.nn.Module):     def __init__(self, ndf=16, dropout_value=0.5):  # ndf feature map discriminator         super().__init__()         self.ndf = ndf         self.droupout_value = dropout_value          self.condi = nn.Sequential(             nn.Linear(in_features=10, out_features=64 * 64)         )          self.hidden0 = nn.Sequential(             nn.Conv2d(in_channels=2, out_channels=self.ndf, kernel_size=4, stride=2, padding=1, bias=False),             nn.LeakyReLU(0.2),         )         self.hidden1 = nn.Sequential(             nn.Conv2d(in_channels=self.ndf, out_channels=self.ndf * 2, kernel_size=4, stride=2, padding=1, bias=False),             nn.BatchNorm2d(self.ndf * 2),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )         self.hidden2 = nn.Sequential(             nn.Conv2d(in_channels=self.ndf * 2, out_channels=self.ndf * 4, kernel_size=4, stride=2, padding=1, bias=False),             #nn.BatchNorm2d(self.ndf * 4),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )         self.hidden3 = nn.Sequential(             nn.Conv2d(in_channels=self.ndf * 4, out_channels=self.ndf * 8, kernel_size=4, stride=2, padding=1, bias=False),             nn.BatchNorm2d(self.ndf * 8),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )         self.out = nn.Sequential(             nn.Conv2d(in_channels=self.ndf * 8, out_channels=1, kernel_size=4, stride=1, padding=0, bias=False),             torch.nn.Sigmoid()         )      def forward(self, x, y):         y = self.condi(y.view(-1, 10))         y = y.view(-1, 1, 64, 64)          x = torch.cat((x, y), dim=1)          x = self.hidden0(x)         x = self.hidden1(x)         x = self.hidden2(x)         x = self.hidden3(x)         x = self.out(x)          return x   class Generator(torch.nn.Module):     def __init__(self, n_features=100, ngf=16, c_channels=1, dropout_value=0.5):  # ngf feature map of generator         super().__init__()         self.ngf = ngf         self.n_features = n_features         self.c_channels = c_channels         self.droupout_value = dropout_value          self.hidden0 = nn.Sequential(             nn.ConvTranspose2d(in_channels=self.n_features + 10, out_channels=self.ngf * 8,                                kernel_size=4, stride=1, padding=0, bias=False),             nn.BatchNorm2d(self.ngf * 8),             nn.LeakyReLU(0.2)         )          self.hidden1 = nn.Sequential(             nn.ConvTranspose2d(in_channels=self.ngf * 8, out_channels=self.ngf * 4,                                kernel_size=4, stride=2, padding=1, bias=False),             #nn.BatchNorm2d(self.ngf * 4),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )          self.hidden2 = nn.Sequential(             nn.ConvTranspose2d(in_channels=self.ngf * 4, out_channels=self.ngf * 2,                                kernel_size=4, stride=2, padding=1, bias=False),             nn.BatchNorm2d(self.ngf * 2),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )          self.hidden3 = nn.Sequential(             nn.ConvTranspose2d(in_channels=self.ngf * 2, out_channels=self.ngf,                                kernel_size=4, stride=2, padding=1, bias=False),             nn.BatchNorm2d(self.ngf),             nn.LeakyReLU(0.2),             nn.Dropout(self.droupout_value)         )          self.out = nn.Sequential(             # "out_channels=1" because gray scale             nn.ConvTranspose2d(in_channels=self.ngf, out_channels=1, kernel_size=4,                                stride=2, padding=1, bias=False),             nn.Tanh()         )      def forward(self, x, y):         x_cond = torch.cat((x, y), dim=1)  # Combine flatten image with conditional input (class labels)          x = self.hidden0(x_cond)           # Image goes into a "ConvTranspose2d" layer         x = self.hidden1(x)         x = self.hidden2(x)         x = self.hidden3(x)         x = self.out(x)          return x   class Logger:     def __init__(self, model_name, model1, model2, m1_optimizer, m2_optimizer, model_parameter, train_loader):         self.out_dir = "data"         self.model_name = model_name         self.train_loader = train_loader         self.model1 = model1         self.model2 = model2         self.model_parameter = model_parameter         self.m1_optimizer = m1_optimizer         self.m2_optimizer = m2_optimizer          # Exclude Epochs of the model name. This make sense e.g. when we stop a training progress and continue later on.         self.experiment_name = '_'.join("{!s}={!r}".format(k, v) for (k, v) in model_parameter.items())\             .replace("Epochs" + "=" + str(model_parameter["Epochs"]), "")          self.d_error = 0         self.g_error = 0          self.tb = SummaryWriter(log_dir=str(self.out_dir + "/log/" + self.model_name + "/runs/" + self.experiment_name))          self.path_image = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/images/{self.experiment_name}')         self.path_model = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/model/{self.experiment_name}')          try:             os.makedirs(self.path_image)         except Exception as e:             print("WARNING: ", str(e))          try:             os.makedirs(self.path_model)         except Exception as e:             print("WARNING: ", str(e))      def log_graph(self, model1_input, model2_input, model1_label, model2_label):         self.tb.add_graph(self.model1, input_to_model=(model1_input, model1_label))         self.tb.add_graph(self.model2, input_to_model=(model2_input, model2_label))      def log(self, num_epoch, d_error, g_error):         self.d_error = d_error         self.g_error = g_error          self.tb.add_scalar("Discriminator Train Error", self.d_error, num_epoch)         self.tb.add_scalar("Generator Train Error", self.g_error, num_epoch)      def log_image(self, images, epoch, batch_num):         grid = torchvision.utils.make_grid(images)         torchvision.utils.save_image(grid, f'{self.path_image}\\Epoch_{epoch}_batch_{batch_num}.png')          self.tb.add_image("Generator Image", grid)      def log_histogramm(self):         for name, param in self.model2.named_parameters():             self.tb.add_histogram(name, param, self.model_parameter["Epochs"])             self.tb.add_histogram(f'gen_{name}.grad', param.grad, self.model_parameter["Epochs"])          for name, param in self.model1.named_parameters():             self.tb.add_histogram(name, param, self.model_parameter["Epochs"])             self.tb.add_histogram(f'dis_{name}.grad', param.grad, self.model_parameter["Epochs"])      def log_model(self, num_epoch):         torch.save({             "epoch": num_epoch,             "model_generator_state_dict": self.model1.state_dict(),             "model_discriminator_state_dict": self.model2.state_dict(),             "optimizer_generator_state_dict":  self.m1_optimizer.state_dict(),             "optimizer_discriminator_state_dict":  self.m2_optimizer.state_dict(),         }, str(self.path_model + f'\\{time.time()}_epoch{num_epoch}.pth'))      def close(self, logger, images, num_epoch,  d_error, g_error):         logger.log_model(num_epoch)         logger.log_histogramm()         logger.log(num_epoch, d_error, g_error)         self.tb.close()      def display_stats(self, epoch, batch_num, dis_error, gen_error):         print(f'Epoch: [{epoch}/{self.model_parameter["Epochs"]}] '               f'Batch: [{batch_num}/{len(self.train_loader)}] '               f'Loss_D: {dis_error.data.cpu()}, '               f'Loss_G: {gen_error.data.cpu()}')   def get_MNIST_dataset(num_workers_loader, model_parameter, out_dir="data"):     compose = transforms.Compose([         transforms.Resize((64, 64)),         transforms.CenterCrop((64, 64)),         transforms.ToTensor(),         torchvision.transforms.Normalize(mean=[0.5], std=[0.5])     ])      dataset = datasets.MNIST(         root=out_dir,         train=True,         download=True,         transform=compose     )      train_loader = torch.utils.data.DataLoader(dataset,                                                batch_size=model_parameter["batch_size"],                                                num_workers=num_workers_loader,                                                shuffle=model_parameter["shuffle"])      return dataset, train_loader   def train_discriminator(p_optimizer, p_noise, p_images, p_fake_target, p_real_target, p_images_labels, p_fake_labels, device):     p_optimizer.zero_grad()      # 1.1 Train on real data     pred_dis_real = discriminator(p_images, p_images_labels)     error_real = loss(pred_dis_real, p_real_target)      error_real.backward()      # 1.2 Train on fake data     fake_data = generator(p_noise, p_fake_labels).detach()     fake_data = add_noise_to_image(fake_data, device)     pred_dis_fake = discriminator(fake_data, p_fake_labels)     error_fake = loss(pred_dis_fake, p_fake_target)      error_fake.backward()      p_optimizer.step()      return error_fake + error_real   def train_generator(p_optimizer, p_noise, p_real_target, p_fake_labels, device):     p_optimizer.zero_grad()      fake_images = generator(p_noise, p_fake_labels)     fake_images = add_noise_to_image(fake_images, device)     pred_dis_fake = discriminator(fake_images, p_fake_labels)     error_fake = loss(pred_dis_fake, p_real_target)  # because     """     We use "p_real_target" instead of "p_fake_target" because we want to      maximize that the discriminator is wrong.     """      error_fake.backward()      p_optimizer.step()      return fake_images, pred_dis_fake, error_fake   # TODO change to a Truncated normal distribution def get_noise(batch_size, n_features=100):     return torch.FloatTensor(batch_size, n_features, 1, 1).uniform_(-1, 1)   # We flip label of real and fate data. Better gradient flow I have told def get_real_data_target(batch_size):     return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.0, 0.2)   def get_fake_data_target(batch_size):     return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.8, 1.1)   def image_to_vector(images):     return torch.flatten(images, start_dim=1, end_dim=-1)   def vector_to_image(images):     return images.view(images.size(0), 1, 28, 28)   def get_rand_labels(batch_size):     return torch.randint(low=0, high=9, size=(batch_size,))   def load_model(model_load_path):     if model_load_path:         checkpoint = torch.load(model_load_path)          discriminator.load_state_dict(checkpoint["model_discriminator_state_dict"])         generator.load_state_dict(checkpoint["model_generator_state_dict"])          dis_opti.load_state_dict(checkpoint["optimizer_discriminator_state_dict"])         gen_opti.load_state_dict(checkpoint["optimizer_generator_state_dict"])          return checkpoint["epoch"]      else:         return 0   def init_model_optimizer(model_parameter, device):     # Initialize the Models     discriminator = Discriminator(ndf=model_parameter["ndf"], dropout_value=model_parameter["dropout"]).to(device)     generator = Generator(ngf=model_parameter["ngf"], dropout_value=model_parameter["dropout"]).to(device)      # train     dis_opti = optim.Adam(discriminator.parameters(), lr=model_parameter["learning_rate_dis"], betas=(0.5, 0.999))     gen_opti = optim.Adam(generator.parameters(), lr=model_parameter["learning_rate_gen"], betas=(0.5, 0.999))      return discriminator, generator, dis_opti, gen_opti   def get_hot_vector_encode(labels, device):     return torch.eye(10)[labels].view(-1, 10, 1, 1).to(device)   def add_noise_to_image(images, device, level_of_noise=0.1):     return images[0].to(device) + (level_of_noise) * torch.randn(images.shape).to(device)   if __name__ == "__main__":     # Hyperparameter     model_parameter = {         "batch_size": 500,         "learning_rate_dis": 0.0002,         "learning_rate_gen": 0.0002,         "shuffle": False,         "Epochs": 10,         "ndf": 64,         "ngf": 64,         "dropout": 0.5     }      # Parameter     r_frequent = 10        # How many samples we save for replay per batch (batch_size / r_frequent).     model_name = "CDCGAN"   # The name of you model e.g. "Gan"     num_workers_loader = 1  # How many workers should load the data     sample_save_size = 16   # How many numbers your saved imaged should show     device = "cuda"         # Which device should be used to train the neural network     model_load_path = ""    # If set load model instead of training from new     num_epoch_log = 1       # How frequent you want to log/     torch.manual_seed(43)   # Sets a seed for torch for reproducibility      dataset_train, train_loader = get_MNIST_dataset(num_workers_loader, model_parameter)  # Get dataset      # Initialize the Models and optimizer     discriminator, generator, dis_opti, gen_opti = init_model_optimizer(model_parameter, device)  # Init model/Optimizer      start_epoch = load_model(model_load_path)  # when we want to load a model      # Init Logger     logger = Logger(model_name, generator, discriminator, gen_opti, dis_opti, model_parameter, train_loader)      loss = nn.BCELoss()      images, labels = next(iter(train_loader))  # For logging      # For testing     # pred = generator(get_noise(model_parameter["batch_size"]).to(device), get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device))     # dis = discriminator(images.to(device), get_hot_vector_encode(labels, device))      logger.log_graph(get_noise(model_parameter["batch_size"]).to(device), images.to(device),                      get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device),                      get_hot_vector_encode(labels, device))       # Array to store     exp_replay = torch.tensor([]).to(device)      for num_epoch in range(start_epoch, model_parameter["Epochs"]):         for batch_num, data_loader in enumerate(train_loader):             images, labels = data_loader             images = add_noise_to_image(images, device)  # Add noise to the images              # 1. Train Discriminator             dis_error = train_discriminator(                                             dis_opti,                                             get_noise(model_parameter["batch_size"]).to(device),                                             images.to(device),                                             get_fake_data_target(model_parameter["batch_size"]).to(device),                                             get_real_data_target(model_parameter["batch_size"]).to(device),                                             get_hot_vector_encode(labels, device),                                             get_hot_vector_encode(                                                 get_rand_labels(model_parameter["batch_size"]), device),                                             device                                             )              # 2. Train Generator             fake_image, pred_dis_fake, gen_error = train_generator(                                                                   gen_opti,                                                                   get_noise(model_parameter["batch_size"]).to(device),                                                                   get_real_data_target(model_parameter["batch_size"]).to(device),                                                                   get_hot_vector_encode(                                                                       get_rand_labels(model_parameter["batch_size"]),                                                                       device),                                                                   device                                                                   )               # Store a random point for experience replay             perm = torch.randperm(fake_image.size(0))             r_idx = perm[:max(1, int(model_parameter["batch_size"] / r_frequent))]             r_samples = add_noise_to_image(fake_image[r_idx], device)             exp_replay = torch.cat((exp_replay, r_samples), 0).detach()              if exp_replay.size(0) >= model_parameter["batch_size"]:                 # Train on experienced data                 dis_opti.zero_grad()                  r_label = get_hot_vector_encode(torch.zeros(exp_replay.size(0)).numpy(), device)                 pred_dis_real = discriminator(exp_replay, r_label)                 error_real = loss(pred_dis_real,  get_fake_data_target(exp_replay.size(0)).to(device))                  error_real.backward()                  dis_opti.step()                  print(f'Epoch: [{num_epoch}/{model_parameter["Epochs"]}] '                       f'Batch: Replay/Experience batch '                       f'Loss_D: {error_real.data.cpu()}, '                       )                  exp_replay = torch.tensor([]).to(device)              logger.display_stats(epoch=num_epoch, batch_num=batch_num, dis_error=dis_error, gen_error=gen_error)              if batch_num % 100 == 0:                 logger.log_image(fake_image[:sample_save_size], num_epoch, batch_num)          logger.log(num_epoch, dis_error, gen_error)         if num_epoch % num_epoch_log == 0:             logger.log_model(num_epoch)             logger.log_histogramm()     logger.close(logger, fake_image[:sample_save_size], num_epoch, dis_error, gen_error) 

First link to my Code (Pastebin)
Second link to my Code (0bin)

Conclusion:

Since I implemented all these things (e.g. label smoothing) which are considered beneficial to a GAN/DCGAN.
And my Model still performs worse than the Tutorial DCGAN from PyTorch I think I might have a bug in my code but I can't seem to find it.

Reproducibility:

You should be able to just copy the code and run it if you have the libraries that I imported installed to look for yourself if you can find anything.

I appreciate any feedback.

like image 363
Lupos Avatar asked Feb 26 '20 19:02

Lupos


People also ask

Is GAN and DCGAN are the same?

The Deep Convolutional GAN (DCGAN) is another approche of GAN that is specially used for image data, the particulatity of DCGAN's is that they use convolution layers in the discriminator and transpose convolution layers for the generator.

How do I fix collapse mode?

A carefully tunned learning rate may mitigate some serious GAN's problems like mode collapse. In specific, lower the learning rate and redo the training when mode collapse happens. We can also experiment with different learning rates for the generator and the discriminator.

What causes mode collapse?

Mode collapse happens when the generator can only produce a single type of output or a small set of outputs. This may happen due to problems in training, such as the generator finds a type of data that is easily able to fool the discriminator and thus keeps generating that one type.

Why is the DCGAN output much better than vanilla GAN elaborate?

This means a DCGAN would likely be more fitting for image/video data, whereas the general idea of a GAN can be applied to wider domains, as the model specifics are left open to be addressed by individual model architectures.


1 Answers

So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good. Some disclaimer:

  • I am not quite sure if it works since I did this a year ago
  • its for 128x128px Images MNIST
  • It's not a vanilla GAN I used various optimization techniques
  • If you want to use it you need to change various details, such as the training dataset

Resources:

  • Multi-Scale Gradients
  • Instance Noise
  • Various tricks I used
  • More tricks

``

import torch from torch.autograd import Variable import torch.nn as nn import torch.nn.functional as F import torchvision import torchvision.transforms as transforms from torch.utils.data import DataLoader  import pytorch_lightning as pl from pytorch_lightning import loggers  from numpy.random import choice  import os from pathlib import Path import shutil  from collections import OrderedDict  # custom weights initialization called on netG and netD def weights_init(m):     classname = m.__class__.__name__     if classname.find('Conv') != -1:         nn.init.normal_(m.weight.data, 0.0, 0.02)     elif classname.find('BatchNorm') != -1:         nn.init.normal_(m.weight.data, 1.0, 0.02)         nn.init.constant_(m.bias.data, 0)  # randomly flip some labels def noisy_labels(y, p_flip=0.05):  # # flip labels with 5% probability     # determine the number of labels to flip     n_select = int(p_flip * y.shape[0])     # choose labels to flip     flip_ix = choice([i for i in range(y.shape[0])], size=n_select)     # invert the labels in place     y[flip_ix] = 1 - y[flip_ix]     return y  class AddGaussianNoise(object):     def __init__(self, mean=0.0, std=0.1):         self.std = std         self.mean = mean      def __call__(self, tensor):         tensor = tensor.cuda()         return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda()      def __repr__(self):         return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std)  def resize2d(img, size):     return (F.adaptive_avg_pool2d(img, size).data).cuda()  def get_valid_labels(img):     return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda()  # soft labels  def get_unvalid_labels(img):     return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda()  # soft labels  class Generator(pl.LightningModule):     def __init__(self, ngf, nc, latent_dim):         super(Generator, self).__init__()         self.ngf = ngf         self.latent_dim = latent_dim         self.nc = nc          self.fc0 = nn.Sequential(             # input is Z, going into a convolution             nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ngf * 16)         )          self.fc1 = nn.Sequential(             # state size. (ngf*8) x 4 x 4             nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ngf * 8)         )          self.fc2 = nn.Sequential(             # state size. (ngf*4) x 8 x 8             nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ngf * 4)         )          self.fc3 = nn.Sequential(             # state size. (ngf*2) x 16 x 16             nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ngf * 2)         )          self.fc4 = nn.Sequential(             # state size. (ngf) x 32 x 32             nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ngf)         )          self.fc5 = nn.Sequential(             # state size. (nc) x 64 x 64             nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)),             nn.Tanh()         )          # state size. (nc) x 128 x 128          # For Multi-Scale Gradient         # Converting the intermediate layers into images         self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1)         self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1)         self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1)         self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1)         self.fc4_r = nn.Conv2d(ngf, self.nc, 1)      def forward(self, input):         x_0 = self.fc0(input)         x_1 = self.fc1(x_0)         x_2 = self.fc2(x_1)         x_3 = self.fc3(x_2)         x_4 = self.fc4(x_3)         x_5 = self.fc5(x_4)          # For Multi-Scale Gradient         # Converting the intermediate layers into images         x_0_r = self.fc0_r(x_0)         x_1_r = self.fc1_r(x_1)         x_2_r = self.fc2_r(x_2)         x_3_r = self.fc3_r(x_3)         x_4_r = self.fc4_r(x_4)          return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r  class Discriminator(pl.LightningModule):     def __init__(self, ndf, nc):         super(Discriminator, self).__init__()         self.nc = nc         self.ndf = ndf          self.fc0 = nn.Sequential(             # input is (nc) x 128 x 128             nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True)         )          self.fc1 = nn.Sequential(             # state size. (ndf) x 64 x 64             nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)),             # "+ nc" because of multi scale gradient             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ndf * 2)         )          self.fc2 = nn.Sequential(             # state size. (ndf*2) x 32 x 32             nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)),             # "+ nc" because of multi scale gradient             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ndf * 4)         )          self.fc3 = nn.Sequential(             # state size. (ndf*4) x 16 x 16e             nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)),             # "+ nc" because of multi scale gradient             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ndf * 8),         )          self.fc4 = nn.Sequential(             # state size. (ndf*8) x 8 x 8             nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)),             nn.LeakyReLU(0.2, inplace=True),             nn.BatchNorm2d(ndf * 16)         )          self.fc5 = nn.Sequential(             # state size. (ndf*8) x 4 x 4             nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)),             nn.Sigmoid()         )          # state size. 1 x 1 x 1      def forward(self, input, detach_or_not):         # When we train i ncombination with generator we use multi scale gradient.         x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input         if detach_or_not:             x = x.detach()          x_0 = self.fc0(x)          x_0 = torch.cat((x_0, x_4_r), dim=1)  # Concat Multi-Scale Gradient         x_1 = self.fc1(x_0)          x_1 = torch.cat((x_1, x_3_r), dim=1)  # Concat Multi-Scale Gradient         x_2 = self.fc2(x_1)          x_2 = torch.cat((x_2, x_2_r), dim=1)  # Concat Multi-Scale Gradient         x_3 = self.fc3(x_2)          x_3 = torch.cat((x_3, x_1_r), dim=1)  # Concat Multi-Scale Gradient         x_4 = self.fc4(x_3)          x_4 = torch.cat((x_4, x_0_r), dim=1)  # Concat Multi-Scale Gradient         x_5 = self.fc5(x_4)          return x_5  class DCGAN(pl.LightningModule):      def __init__(self, hparams, checkpoint_folder, experiment_name):         super().__init__()         self.hparams = hparams         self.checkpoint_folder = checkpoint_folder         self.experiment_name = experiment_name          # networks         self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim)         self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc)         self.generator.apply(weights_init)         self.discriminator.apply(weights_init)          # cache for generated images         self.generated_imgs = None         self.last_imgs = None          # For experience replay         self.exp_replay_dis = torch.tensor([])       def forward(self, z):         return self.generator(z)      def adversarial_loss(self, y_hat, y):         return F.binary_cross_entropy(y_hat, y)      def training_step(self, batch, batch_nb, optimizer_idx):         # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/         std_gaussian = max(0, self.hparams.level_of_noise - (                 (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs)))         AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian)  # the noise decays over time          imgs, _ = batch         imgs = AddGaussianNoiseInst(imgs)  # Adding instance noise to real images         self.last_imgs = imgs          # train generator         if optimizer_idx == 0:             # sample noise             z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda()              # generate images             self.generated_imgs = self(z)              # ground truth result (ie: all fake)             g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0]))  # adversarial loss is binary cross-entropy; [0] is the image of the last layer              tqdm_dict = {'g_loss': g_loss}             log = {'g_loss': g_loss, "std_gaussian": std_gaussian}             output = OrderedDict({                 'loss': g_loss,                 'progress_bar': tqdm_dict,                 'log': log             })             return output          # train discriminator         if optimizer_idx == 1:             # Measure discriminator's ability to classify real from generated samples             # how well can it label as real?             real_loss = self.adversarial_loss(                 self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)],                                    False), get_valid_labels(imgs))              fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels(                 self.generated_imgs[0]))  # how well can it label as fake?; [0] is the image of the last layer              # discriminator loss is the average of these             d_loss = (real_loss + fake_loss) / 2              tqdm_dict = {'d_loss': d_loss}             log = {'d_loss': d_loss, "std_gaussian": std_gaussian}             output = OrderedDict({                 'loss': d_loss,                 'progress_bar': tqdm_dict,                 'log': log             })             return output      def configure_optimizers(self):         lr_gen = self.hparams.lr_gen         lr_dis = self.hparams.lr_dis         b1 = self.hparams.b1         b2 = self.hparams.b2          opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2))         opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2))         return [opt_g, opt_d], []      def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None:         loss.backward(retain_graph=True)      def train_dataloader(self):         # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),         #                                 transforms.ToTensor(),         #                                 transforms.Normalize([0.5], [0.5])])         # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform)         # return DataLoader(dataset, batch_size=self.hparams.batch_size)         # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),         #                                 transforms.ToTensor(),         #                                 transforms.Normalize([0.5], [0.5])         #                                 ])          # train_dataset = torchvision.datasets.ImageFolder(         #     root="./drive/My Drive/datasets/flower_dataset/",         #     # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/",         #     transform=transform         # )         # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,         #                   batch_size=self.hparams.batch_size)          transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)),                                         transforms.ToTensor(),                                         transforms.Normalize([0.5], [0.5])                                         ])         train_dataset = torchvision.datasets.ImageFolder(             root="ghibli_dataset_small_overfit/",             transform=transform         )         return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True,                           batch_size=self.hparams.batch_size)      def on_epoch_end(self):         z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda()         # match gpu device (or keep as cpu)         if self.on_gpu:             z = z.cuda(self.last_imgs.device.index)          # log sampled images         sample_imgs = self.generator(z)[0]         torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png')          # save model         if self.current_epoch % self.hparams.save_model_every_epoch == 0:             trainer.save_checkpoint(                 self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt")  from argparse import Namespace  args = {     'batch_size': 128, # batch size     'lr_gen': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002     'lr_dis': 0.0003,  # TTUR;learnin rate of both networks; tested value: 0.0002     'b1': 0.5,  # Momentum for adam; tested value(dcgan paper): 0.5     'b2': 0.999,  # Momentum for adam; tested value(dcgan paper): 0.999     'latent_dim': 256,  # tested value which worked(in V4_1): 100     'nc': 3,  # number of color channels     'ndf': 8,  # number of discriminator features     'ngf': 8,  # number of generator features     'epochs': 4,  # the maxima lamount of epochs the algorith should run     'save_model_every_epoch': 1,  # how often we save our model     'image_size': 128, # size of the image     'num_workers': 3,     'level_of_noise': 0.1,  # how much instance noise we introduce(std; tested value: 0.15 and 0.1     'experience_save_per_batch': 1,  # this value should be very low; tested value which works: 1     'experience_batch_size': 50  # this value shouldnt be too high; tested value which works: 50 } hparams = Namespace(**args)  # Parameters experiment_name = "DCGAN_6_2_MNIST_128px" dataset_name = "mnist" checkpoint_folder = "DCGAN/" tags = ["DCGAN", "128x128"] dirpath = Path(checkpoint_folder)  # defining net net = DCGAN(hparams, checkpoint_folder, experiment_name)  torch.autograd.set_detect_anomaly(True) trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt",     max_epochs=args["epochs"],     gpus=1 )  trainer.fit(net) 

``

like image 100
Lupos Avatar answered Oct 07 '22 02:10

Lupos