Introduction:
I am trying to get a CDCGAN (Conditional Deep Convolutional Generative Adversarial Network) to work on the MNIST dataset which should be fairly easy considering that the library (PyTorch) I am using has a tutorial on its website.
But I can't seem to get It working it just produces garbage or the model collapses or both.
What I tried:
Images my Model generated:
Hyperparameters:
batch_size=50, learning_rate_discrimiantor=0.0001, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, droupout=0.5
batch_size=50, learning_rate_discriminator=0.0003, learning_rate_generator=0.0003, shuffle=True, ndf=64, ngf=64, dropout=0
Images Pytorch tutorial Model generated:
Code for the pytorch tutorial dcgan model
As comparison here are the images from the DCGAN from the pytorch turoial:
My Code:
import torch import torch.nn as nn import torchvision from torchvision import transforms, datasets import torch.nn.functional as F from torch import optim as optim from torch.utils.tensorboard import SummaryWriter import numpy as np import os import time class Discriminator(torch.nn.Module): def __init__(self, ndf=16, dropout_value=0.5): # ndf feature map discriminator super().__init__() self.ndf = ndf self.droupout_value = dropout_value self.condi = nn.Sequential( nn.Linear(in_features=10, out_features=64 * 64) ) self.hidden0 = nn.Sequential( nn.Conv2d(in_channels=2, out_channels=self.ndf, kernel_size=4, stride=2, padding=1, bias=False), nn.LeakyReLU(0.2), ) self.hidden1 = nn.Sequential( nn.Conv2d(in_channels=self.ndf, out_channels=self.ndf * 2, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(self.ndf * 2), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.hidden2 = nn.Sequential( nn.Conv2d(in_channels=self.ndf * 2, out_channels=self.ndf * 4, kernel_size=4, stride=2, padding=1, bias=False), #nn.BatchNorm2d(self.ndf * 4), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.hidden3 = nn.Sequential( nn.Conv2d(in_channels=self.ndf * 4, out_channels=self.ndf * 8, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(self.ndf * 8), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.out = nn.Sequential( nn.Conv2d(in_channels=self.ndf * 8, out_channels=1, kernel_size=4, stride=1, padding=0, bias=False), torch.nn.Sigmoid() ) def forward(self, x, y): y = self.condi(y.view(-1, 10)) y = y.view(-1, 1, 64, 64) x = torch.cat((x, y), dim=1) x = self.hidden0(x) x = self.hidden1(x) x = self.hidden2(x) x = self.hidden3(x) x = self.out(x) return x class Generator(torch.nn.Module): def __init__(self, n_features=100, ngf=16, c_channels=1, dropout_value=0.5): # ngf feature map of generator super().__init__() self.ngf = ngf self.n_features = n_features self.c_channels = c_channels self.droupout_value = dropout_value self.hidden0 = nn.Sequential( nn.ConvTranspose2d(in_channels=self.n_features + 10, out_channels=self.ngf * 8, kernel_size=4, stride=1, padding=0, bias=False), nn.BatchNorm2d(self.ngf * 8), nn.LeakyReLU(0.2) ) self.hidden1 = nn.Sequential( nn.ConvTranspose2d(in_channels=self.ngf * 8, out_channels=self.ngf * 4, kernel_size=4, stride=2, padding=1, bias=False), #nn.BatchNorm2d(self.ngf * 4), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.hidden2 = nn.Sequential( nn.ConvTranspose2d(in_channels=self.ngf * 4, out_channels=self.ngf * 2, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(self.ngf * 2), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.hidden3 = nn.Sequential( nn.ConvTranspose2d(in_channels=self.ngf * 2, out_channels=self.ngf, kernel_size=4, stride=2, padding=1, bias=False), nn.BatchNorm2d(self.ngf), nn.LeakyReLU(0.2), nn.Dropout(self.droupout_value) ) self.out = nn.Sequential( # "out_channels=1" because gray scale nn.ConvTranspose2d(in_channels=self.ngf, out_channels=1, kernel_size=4, stride=2, padding=1, bias=False), nn.Tanh() ) def forward(self, x, y): x_cond = torch.cat((x, y), dim=1) # Combine flatten image with conditional input (class labels) x = self.hidden0(x_cond) # Image goes into a "ConvTranspose2d" layer x = self.hidden1(x) x = self.hidden2(x) x = self.hidden3(x) x = self.out(x) return x class Logger: def __init__(self, model_name, model1, model2, m1_optimizer, m2_optimizer, model_parameter, train_loader): self.out_dir = "data" self.model_name = model_name self.train_loader = train_loader self.model1 = model1 self.model2 = model2 self.model_parameter = model_parameter self.m1_optimizer = m1_optimizer self.m2_optimizer = m2_optimizer # Exclude Epochs of the model name. This make sense e.g. when we stop a training progress and continue later on. self.experiment_name = '_'.join("{!s}={!r}".format(k, v) for (k, v) in model_parameter.items())\ .replace("Epochs" + "=" + str(model_parameter["Epochs"]), "") self.d_error = 0 self.g_error = 0 self.tb = SummaryWriter(log_dir=str(self.out_dir + "/log/" + self.model_name + "/runs/" + self.experiment_name)) self.path_image = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/images/{self.experiment_name}') self.path_model = os.path.join(os.getcwd(), f'{self.out_dir}/log/{self.model_name}/model/{self.experiment_name}') try: os.makedirs(self.path_image) except Exception as e: print("WARNING: ", str(e)) try: os.makedirs(self.path_model) except Exception as e: print("WARNING: ", str(e)) def log_graph(self, model1_input, model2_input, model1_label, model2_label): self.tb.add_graph(self.model1, input_to_model=(model1_input, model1_label)) self.tb.add_graph(self.model2, input_to_model=(model2_input, model2_label)) def log(self, num_epoch, d_error, g_error): self.d_error = d_error self.g_error = g_error self.tb.add_scalar("Discriminator Train Error", self.d_error, num_epoch) self.tb.add_scalar("Generator Train Error", self.g_error, num_epoch) def log_image(self, images, epoch, batch_num): grid = torchvision.utils.make_grid(images) torchvision.utils.save_image(grid, f'{self.path_image}\\Epoch_{epoch}_batch_{batch_num}.png') self.tb.add_image("Generator Image", grid) def log_histogramm(self): for name, param in self.model2.named_parameters(): self.tb.add_histogram(name, param, self.model_parameter["Epochs"]) self.tb.add_histogram(f'gen_{name}.grad', param.grad, self.model_parameter["Epochs"]) for name, param in self.model1.named_parameters(): self.tb.add_histogram(name, param, self.model_parameter["Epochs"]) self.tb.add_histogram(f'dis_{name}.grad', param.grad, self.model_parameter["Epochs"]) def log_model(self, num_epoch): torch.save({ "epoch": num_epoch, "model_generator_state_dict": self.model1.state_dict(), "model_discriminator_state_dict": self.model2.state_dict(), "optimizer_generator_state_dict": self.m1_optimizer.state_dict(), "optimizer_discriminator_state_dict": self.m2_optimizer.state_dict(), }, str(self.path_model + f'\\{time.time()}_epoch{num_epoch}.pth')) def close(self, logger, images, num_epoch, d_error, g_error): logger.log_model(num_epoch) logger.log_histogramm() logger.log(num_epoch, d_error, g_error) self.tb.close() def display_stats(self, epoch, batch_num, dis_error, gen_error): print(f'Epoch: [{epoch}/{self.model_parameter["Epochs"]}] ' f'Batch: [{batch_num}/{len(self.train_loader)}] ' f'Loss_D: {dis_error.data.cpu()}, ' f'Loss_G: {gen_error.data.cpu()}') def get_MNIST_dataset(num_workers_loader, model_parameter, out_dir="data"): compose = transforms.Compose([ transforms.Resize((64, 64)), transforms.CenterCrop((64, 64)), transforms.ToTensor(), torchvision.transforms.Normalize(mean=[0.5], std=[0.5]) ]) dataset = datasets.MNIST( root=out_dir, train=True, download=True, transform=compose ) train_loader = torch.utils.data.DataLoader(dataset, batch_size=model_parameter["batch_size"], num_workers=num_workers_loader, shuffle=model_parameter["shuffle"]) return dataset, train_loader def train_discriminator(p_optimizer, p_noise, p_images, p_fake_target, p_real_target, p_images_labels, p_fake_labels, device): p_optimizer.zero_grad() # 1.1 Train on real data pred_dis_real = discriminator(p_images, p_images_labels) error_real = loss(pred_dis_real, p_real_target) error_real.backward() # 1.2 Train on fake data fake_data = generator(p_noise, p_fake_labels).detach() fake_data = add_noise_to_image(fake_data, device) pred_dis_fake = discriminator(fake_data, p_fake_labels) error_fake = loss(pred_dis_fake, p_fake_target) error_fake.backward() p_optimizer.step() return error_fake + error_real def train_generator(p_optimizer, p_noise, p_real_target, p_fake_labels, device): p_optimizer.zero_grad() fake_images = generator(p_noise, p_fake_labels) fake_images = add_noise_to_image(fake_images, device) pred_dis_fake = discriminator(fake_images, p_fake_labels) error_fake = loss(pred_dis_fake, p_real_target) # because """ We use "p_real_target" instead of "p_fake_target" because we want to maximize that the discriminator is wrong. """ error_fake.backward() p_optimizer.step() return fake_images, pred_dis_fake, error_fake # TODO change to a Truncated normal distribution def get_noise(batch_size, n_features=100): return torch.FloatTensor(batch_size, n_features, 1, 1).uniform_(-1, 1) # We flip label of real and fate data. Better gradient flow I have told def get_real_data_target(batch_size): return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.0, 0.2) def get_fake_data_target(batch_size): return torch.FloatTensor(batch_size, 1, 1, 1).uniform_(0.8, 1.1) def image_to_vector(images): return torch.flatten(images, start_dim=1, end_dim=-1) def vector_to_image(images): return images.view(images.size(0), 1, 28, 28) def get_rand_labels(batch_size): return torch.randint(low=0, high=9, size=(batch_size,)) def load_model(model_load_path): if model_load_path: checkpoint = torch.load(model_load_path) discriminator.load_state_dict(checkpoint["model_discriminator_state_dict"]) generator.load_state_dict(checkpoint["model_generator_state_dict"]) dis_opti.load_state_dict(checkpoint["optimizer_discriminator_state_dict"]) gen_opti.load_state_dict(checkpoint["optimizer_generator_state_dict"]) return checkpoint["epoch"] else: return 0 def init_model_optimizer(model_parameter, device): # Initialize the Models discriminator = Discriminator(ndf=model_parameter["ndf"], dropout_value=model_parameter["dropout"]).to(device) generator = Generator(ngf=model_parameter["ngf"], dropout_value=model_parameter["dropout"]).to(device) # train dis_opti = optim.Adam(discriminator.parameters(), lr=model_parameter["learning_rate_dis"], betas=(0.5, 0.999)) gen_opti = optim.Adam(generator.parameters(), lr=model_parameter["learning_rate_gen"], betas=(0.5, 0.999)) return discriminator, generator, dis_opti, gen_opti def get_hot_vector_encode(labels, device): return torch.eye(10)[labels].view(-1, 10, 1, 1).to(device) def add_noise_to_image(images, device, level_of_noise=0.1): return images[0].to(device) + (level_of_noise) * torch.randn(images.shape).to(device) if __name__ == "__main__": # Hyperparameter model_parameter = { "batch_size": 500, "learning_rate_dis": 0.0002, "learning_rate_gen": 0.0002, "shuffle": False, "Epochs": 10, "ndf": 64, "ngf": 64, "dropout": 0.5 } # Parameter r_frequent = 10 # How many samples we save for replay per batch (batch_size / r_frequent). model_name = "CDCGAN" # The name of you model e.g. "Gan" num_workers_loader = 1 # How many workers should load the data sample_save_size = 16 # How many numbers your saved imaged should show device = "cuda" # Which device should be used to train the neural network model_load_path = "" # If set load model instead of training from new num_epoch_log = 1 # How frequent you want to log/ torch.manual_seed(43) # Sets a seed for torch for reproducibility dataset_train, train_loader = get_MNIST_dataset(num_workers_loader, model_parameter) # Get dataset # Initialize the Models and optimizer discriminator, generator, dis_opti, gen_opti = init_model_optimizer(model_parameter, device) # Init model/Optimizer start_epoch = load_model(model_load_path) # when we want to load a model # Init Logger logger = Logger(model_name, generator, discriminator, gen_opti, dis_opti, model_parameter, train_loader) loss = nn.BCELoss() images, labels = next(iter(train_loader)) # For logging # For testing # pred = generator(get_noise(model_parameter["batch_size"]).to(device), get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device)) # dis = discriminator(images.to(device), get_hot_vector_encode(labels, device)) logger.log_graph(get_noise(model_parameter["batch_size"]).to(device), images.to(device), get_hot_vector_encode(get_rand_labels(model_parameter["batch_size"]), device), get_hot_vector_encode(labels, device)) # Array to store exp_replay = torch.tensor([]).to(device) for num_epoch in range(start_epoch, model_parameter["Epochs"]): for batch_num, data_loader in enumerate(train_loader): images, labels = data_loader images = add_noise_to_image(images, device) # Add noise to the images # 1. Train Discriminator dis_error = train_discriminator( dis_opti, get_noise(model_parameter["batch_size"]).to(device), images.to(device), get_fake_data_target(model_parameter["batch_size"]).to(device), get_real_data_target(model_parameter["batch_size"]).to(device), get_hot_vector_encode(labels, device), get_hot_vector_encode( get_rand_labels(model_parameter["batch_size"]), device), device ) # 2. Train Generator fake_image, pred_dis_fake, gen_error = train_generator( gen_opti, get_noise(model_parameter["batch_size"]).to(device), get_real_data_target(model_parameter["batch_size"]).to(device), get_hot_vector_encode( get_rand_labels(model_parameter["batch_size"]), device), device ) # Store a random point for experience replay perm = torch.randperm(fake_image.size(0)) r_idx = perm[:max(1, int(model_parameter["batch_size"] / r_frequent))] r_samples = add_noise_to_image(fake_image[r_idx], device) exp_replay = torch.cat((exp_replay, r_samples), 0).detach() if exp_replay.size(0) >= model_parameter["batch_size"]: # Train on experienced data dis_opti.zero_grad() r_label = get_hot_vector_encode(torch.zeros(exp_replay.size(0)).numpy(), device) pred_dis_real = discriminator(exp_replay, r_label) error_real = loss(pred_dis_real, get_fake_data_target(exp_replay.size(0)).to(device)) error_real.backward() dis_opti.step() print(f'Epoch: [{num_epoch}/{model_parameter["Epochs"]}] ' f'Batch: Replay/Experience batch ' f'Loss_D: {error_real.data.cpu()}, ' ) exp_replay = torch.tensor([]).to(device) logger.display_stats(epoch=num_epoch, batch_num=batch_num, dis_error=dis_error, gen_error=gen_error) if batch_num % 100 == 0: logger.log_image(fake_image[:sample_save_size], num_epoch, batch_num) logger.log(num_epoch, dis_error, gen_error) if num_epoch % num_epoch_log == 0: logger.log_model(num_epoch) logger.log_histogramm() logger.close(logger, fake_image[:sample_save_size], num_epoch, dis_error, gen_error)
First link to my Code (Pastebin)
Second link to my Code (0bin)
Conclusion:
Since I implemented all these things (e.g. label smoothing) which are considered beneficial to a GAN/DCGAN.
And my Model still performs worse than the Tutorial DCGAN from PyTorch I think I might have a bug in my code but I can't seem to find it.
Reproducibility:
You should be able to just copy the code and run it if you have the libraries that I imported installed to look for yourself if you can find anything.
I appreciate any feedback.
The Deep Convolutional GAN (DCGAN) is another approche of GAN that is specially used for image data, the particulatity of DCGAN's is that they use convolution layers in the discriminator and transpose convolution layers for the generator.
A carefully tunned learning rate may mitigate some serious GAN's problems like mode collapse. In specific, lower the learning rate and redo the training when mode collapse happens. We can also experiment with different learning rates for the generator and the discriminator.
Mode collapse happens when the generator can only produce a single type of output or a small set of outputs. This may happen due to problems in training, such as the generator finds a type of data that is easily able to fool the discriminator and thus keeps generating that one type.
This means a DCGAN would likely be more fitting for image/video data, whereas the general idea of a GAN can be applied to wider domains, as the model specifics are left open to be addressed by individual model architectures.
So I solved this issue a while ago, but forgot to post an answer on stack overflow. So I will simply post my code here which should work probably pretty good. Some disclaimer:
Resources:
``
import torch from torch.autograd import Variable import torch.nn as nn import torch.nn.functional as F import torchvision import torchvision.transforms as transforms from torch.utils.data import DataLoader import pytorch_lightning as pl from pytorch_lightning import loggers from numpy.random import choice import os from pathlib import Path import shutil from collections import OrderedDict # custom weights initialization called on netG and netD def weights_init(m): classname = m.__class__.__name__ if classname.find('Conv') != -1: nn.init.normal_(m.weight.data, 0.0, 0.02) elif classname.find('BatchNorm') != -1: nn.init.normal_(m.weight.data, 1.0, 0.02) nn.init.constant_(m.bias.data, 0) # randomly flip some labels def noisy_labels(y, p_flip=0.05): # # flip labels with 5% probability # determine the number of labels to flip n_select = int(p_flip * y.shape[0]) # choose labels to flip flip_ix = choice([i for i in range(y.shape[0])], size=n_select) # invert the labels in place y[flip_ix] = 1 - y[flip_ix] return y class AddGaussianNoise(object): def __init__(self, mean=0.0, std=0.1): self.std = std self.mean = mean def __call__(self, tensor): tensor = tensor.cuda() return tensor + (torch.randn(tensor.size()) * self.std + self.mean).cuda() def __repr__(self): return self.__class__.__name__ + '(mean={0}, std={1})'.format(self.mean, self.std) def resize2d(img, size): return (F.adaptive_avg_pool2d(img, size).data).cuda() def get_valid_labels(img): return ((0.8 - 1.1) * torch.rand(img.shape[0], 1, 1, 1) + 1.1).cuda() # soft labels def get_unvalid_labels(img): return (noisy_labels((0.0 - 0.3) * torch.rand(img.shape[0], 1, 1, 1) + 0.3)).cuda() # soft labels class Generator(pl.LightningModule): def __init__(self, ngf, nc, latent_dim): super(Generator, self).__init__() self.ngf = ngf self.latent_dim = latent_dim self.nc = nc self.fc0 = nn.Sequential( # input is Z, going into a convolution nn.utils.spectral_norm(nn.ConvTranspose2d(latent_dim, ngf * 16, 4, 1, 0, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 16) ) self.fc1 = nn.Sequential( # state size. (ngf*8) x 4 x 4 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 16, ngf * 8, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 8) ) self.fc2 = nn.Sequential( # state size. (ngf*4) x 8 x 8 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 8, ngf * 4, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 4) ) self.fc3 = nn.Sequential( # state size. (ngf*2) x 16 x 16 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 4, ngf * 2, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf * 2) ) self.fc4 = nn.Sequential( # state size. (ngf) x 32 x 32 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf * 2, ngf, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ngf) ) self.fc5 = nn.Sequential( # state size. (nc) x 64 x 64 nn.utils.spectral_norm(nn.ConvTranspose2d(ngf, nc, 4, 2, 1, bias=False)), nn.Tanh() ) # state size. (nc) x 128 x 128 # For Multi-Scale Gradient # Converting the intermediate layers into images self.fc0_r = nn.Conv2d(ngf * 16, self.nc, 1) self.fc1_r = nn.Conv2d(ngf * 8, self.nc, 1) self.fc2_r = nn.Conv2d(ngf * 4, self.nc, 1) self.fc3_r = nn.Conv2d(ngf * 2, self.nc, 1) self.fc4_r = nn.Conv2d(ngf, self.nc, 1) def forward(self, input): x_0 = self.fc0(input) x_1 = self.fc1(x_0) x_2 = self.fc2(x_1) x_3 = self.fc3(x_2) x_4 = self.fc4(x_3) x_5 = self.fc5(x_4) # For Multi-Scale Gradient # Converting the intermediate layers into images x_0_r = self.fc0_r(x_0) x_1_r = self.fc1_r(x_1) x_2_r = self.fc2_r(x_2) x_3_r = self.fc3_r(x_3) x_4_r = self.fc4_r(x_4) return x_5, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r class Discriminator(pl.LightningModule): def __init__(self, ndf, nc): super(Discriminator, self).__init__() self.nc = nc self.ndf = ndf self.fc0 = nn.Sequential( # input is (nc) x 128 x 128 nn.utils.spectral_norm(nn.Conv2d(nc, ndf, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True) ) self.fc1 = nn.Sequential( # state size. (ndf) x 64 x 64 nn.utils.spectral_norm(nn.Conv2d(ndf + nc, ndf * 2, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 2) ) self.fc2 = nn.Sequential( # state size. (ndf*2) x 32 x 32 nn.utils.spectral_norm(nn.Conv2d(ndf * 2 + nc, ndf * 4, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 4) ) self.fc3 = nn.Sequential( # state size. (ndf*4) x 16 x 16e nn.utils.spectral_norm(nn.Conv2d(ndf * 4 + nc, ndf * 8, 4, 2, 1, bias=False)), # "+ nc" because of multi scale gradient nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 8), ) self.fc4 = nn.Sequential( # state size. (ndf*8) x 8 x 8 nn.utils.spectral_norm(nn.Conv2d(ndf * 8 + nc, ndf * 16, 4, 2, 1, bias=False)), nn.LeakyReLU(0.2, inplace=True), nn.BatchNorm2d(ndf * 16) ) self.fc5 = nn.Sequential( # state size. (ndf*8) x 4 x 4 nn.utils.spectral_norm(nn.Conv2d(ndf * 16 + nc, 1, 4, 1, 0, bias=False)), nn.Sigmoid() ) # state size. 1 x 1 x 1 def forward(self, input, detach_or_not): # When we train i ncombination with generator we use multi scale gradient. x, x_0_r, x_1_r, x_2_r, x_3_r, x_4_r = input if detach_or_not: x = x.detach() x_0 = self.fc0(x) x_0 = torch.cat((x_0, x_4_r), dim=1) # Concat Multi-Scale Gradient x_1 = self.fc1(x_0) x_1 = torch.cat((x_1, x_3_r), dim=1) # Concat Multi-Scale Gradient x_2 = self.fc2(x_1) x_2 = torch.cat((x_2, x_2_r), dim=1) # Concat Multi-Scale Gradient x_3 = self.fc3(x_2) x_3 = torch.cat((x_3, x_1_r), dim=1) # Concat Multi-Scale Gradient x_4 = self.fc4(x_3) x_4 = torch.cat((x_4, x_0_r), dim=1) # Concat Multi-Scale Gradient x_5 = self.fc5(x_4) return x_5 class DCGAN(pl.LightningModule): def __init__(self, hparams, checkpoint_folder, experiment_name): super().__init__() self.hparams = hparams self.checkpoint_folder = checkpoint_folder self.experiment_name = experiment_name # networks self.generator = Generator(ngf=hparams.ngf, nc=hparams.nc, latent_dim=hparams.latent_dim) self.discriminator = Discriminator(ndf=hparams.ndf, nc=hparams.nc) self.generator.apply(weights_init) self.discriminator.apply(weights_init) # cache for generated images self.generated_imgs = None self.last_imgs = None # For experience replay self.exp_replay_dis = torch.tensor([]) def forward(self, z): return self.generator(z) def adversarial_loss(self, y_hat, y): return F.binary_cross_entropy(y_hat, y) def training_step(self, batch, batch_nb, optimizer_idx): # For adding Instance noise for more visit: https://www.inference.vc/instance-noise-a-trick-for-stabilising-gan-training/ std_gaussian = max(0, self.hparams.level_of_noise - ( (self.hparams.level_of_noise * 2) * (self.current_epoch / self.hparams.epochs))) AddGaussianNoiseInst = AddGaussianNoise(std=std_gaussian) # the noise decays over time imgs, _ = batch imgs = AddGaussianNoiseInst(imgs) # Adding instance noise to real images self.last_imgs = imgs # train generator if optimizer_idx == 0: # sample noise z = torch.randn(imgs.shape[0], self.hparams.latent_dim, 1, 1).cuda() # generate images self.generated_imgs = self(z) # ground truth result (ie: all fake) g_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, False), get_valid_labels(self.generated_imgs[0])) # adversarial loss is binary cross-entropy; [0] is the image of the last layer tqdm_dict = {'g_loss': g_loss} log = {'g_loss': g_loss, "std_gaussian": std_gaussian} output = OrderedDict({ 'loss': g_loss, 'progress_bar': tqdm_dict, 'log': log }) return output # train discriminator if optimizer_idx == 1: # Measure discriminator's ability to classify real from generated samples # how well can it label as real? real_loss = self.adversarial_loss( self.discriminator([imgs, resize2d(imgs, 4), resize2d(imgs, 8), resize2d(imgs, 16), resize2d(imgs, 32), resize2d(imgs, 64)], False), get_valid_labels(imgs)) fake_loss = self.adversarial_loss(self.discriminator(self.generated_imgs, True), get_unvalid_labels( self.generated_imgs[0])) # how well can it label as fake?; [0] is the image of the last layer # discriminator loss is the average of these d_loss = (real_loss + fake_loss) / 2 tqdm_dict = {'d_loss': d_loss} log = {'d_loss': d_loss, "std_gaussian": std_gaussian} output = OrderedDict({ 'loss': d_loss, 'progress_bar': tqdm_dict, 'log': log }) return output def configure_optimizers(self): lr_gen = self.hparams.lr_gen lr_dis = self.hparams.lr_dis b1 = self.hparams.b1 b2 = self.hparams.b2 opt_g = torch.optim.Adam(self.generator.parameters(), lr=lr_gen, betas=(b1, b2)) opt_d = torch.optim.Adam(self.discriminator.parameters(), lr=lr_dis, betas=(b1, b2)) return [opt_g, opt_d], [] def backward(self, trainer, loss, optimizer, optimizer_idx: int) -> None: loss.backward(retain_graph=True) def train_dataloader(self): # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), # transforms.ToTensor(), # transforms.Normalize([0.5], [0.5])]) # dataset = torchvision.datasets.MNIST(os.getcwd(), train=False, download=True, transform=transform) # return DataLoader(dataset, batch_size=self.hparams.batch_size) # transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), # transforms.ToTensor(), # transforms.Normalize([0.5], [0.5]) # ]) # train_dataset = torchvision.datasets.ImageFolder( # root="./drive/My Drive/datasets/flower_dataset/", # # root="./drive/My Drive/datasets/ghibli_dataset_small_overfit/", # transform=transform # ) # return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True, # batch_size=self.hparams.batch_size) transform = transforms.Compose([transforms.Resize((self.hparams.image_size, self.hparams.image_size)), transforms.ToTensor(), transforms.Normalize([0.5], [0.5]) ]) train_dataset = torchvision.datasets.ImageFolder( root="ghibli_dataset_small_overfit/", transform=transform ) return DataLoader(train_dataset, num_workers=self.hparams.num_workers, shuffle=True, batch_size=self.hparams.batch_size) def on_epoch_end(self): z = torch.randn(4, self.hparams.latent_dim, 1, 1).cuda() # match gpu device (or keep as cpu) if self.on_gpu: z = z.cuda(self.last_imgs.device.index) # log sampled images sample_imgs = self.generator(z)[0] torchvision.utils.save_image(sample_imgs, f'generated_images_epoch{self.current_epoch}.png') # save model if self.current_epoch % self.hparams.save_model_every_epoch == 0: trainer.save_checkpoint( self.checkpoint_folder + "/" + self.experiment_name + "_epoch_" + str(self.current_epoch) + ".ckpt") from argparse import Namespace args = { 'batch_size': 128, # batch size 'lr_gen': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002 'lr_dis': 0.0003, # TTUR;learnin rate of both networks; tested value: 0.0002 'b1': 0.5, # Momentum for adam; tested value(dcgan paper): 0.5 'b2': 0.999, # Momentum for adam; tested value(dcgan paper): 0.999 'latent_dim': 256, # tested value which worked(in V4_1): 100 'nc': 3, # number of color channels 'ndf': 8, # number of discriminator features 'ngf': 8, # number of generator features 'epochs': 4, # the maxima lamount of epochs the algorith should run 'save_model_every_epoch': 1, # how often we save our model 'image_size': 128, # size of the image 'num_workers': 3, 'level_of_noise': 0.1, # how much instance noise we introduce(std; tested value: 0.15 and 0.1 'experience_save_per_batch': 1, # this value should be very low; tested value which works: 1 'experience_batch_size': 50 # this value shouldnt be too high; tested value which works: 50 } hparams = Namespace(**args) # Parameters experiment_name = "DCGAN_6_2_MNIST_128px" dataset_name = "mnist" checkpoint_folder = "DCGAN/" tags = ["DCGAN", "128x128"] dirpath = Path(checkpoint_folder) # defining net net = DCGAN(hparams, checkpoint_folder, experiment_name) torch.autograd.set_detect_anomaly(True) trainer = pl.Trainer( # resume_from_checkpoint="DCGAN_V4_2_GHIBLI_epoch_999.ckpt", max_epochs=args["epochs"], gpus=1 ) trainer.fit(net)
``
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With