Problem Statement:
I'm trying to figure out how to use a keras.utils.Sequence object with the keras.model.fit_generator function. My code is below.
Code:
# -*- coding: utf-8 -*-
#!C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\python3.exe
# Imports
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
import sys
import os
import shutil
import glob
import scandir
import math
import multiprocessing as mp
from multiprocessing import Process, Queue, Event
import threading
import time
import csv
import pandas as pd
import keras
from keras.preprocessing.image import load_img, img_to_array
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Lambda, Cropping2D
from keras.layers import Conv2D, MaxPooling2D
from keras import backend as k
import sklearn
# Global Variables
im_shape = (160, 320, 3)
batch_size = 64
num_epochs = 10
num_classes = 1
top_crop = 70
btm_crop = 25
root = r'C:\Users\adam.hendry\Desktop\CarND-Term1\Behavioral Cloning\training_data'
def get_model():
model = Sequential()
model.add(Lambda(lambda x: x / 255.0 - 0.5, input_shape=im_shape))
model.add(Cropping2D(cropping=((top_crop,btm_crop),(0,0))))
model.add(Conv2D(24, (5,5), strides=(2,2), activation='relu'))
model.add(Conv2D(36, (5,5), strides=(2,2), activation='relu'))
model.add(Conv2D(48, (5,5), strides=(2,2), activation='relu'))
model.add(Conv2D(64, (3,3), activation='relu'))
model.add(Conv2D(64, (3,3), activation='relu'))
model.add(Flatten())
model.add(Dense(1164, activation='relu'))
model.add(Dense(100, activation='relu'))
model.add(Dense(50, activation='relu'))
model.add(Dense(10, activation='relu'))
model.add(Dense(num_classes))
model.compile(loss='mse', optimizer='adam')
return model
def merge_csvs_into_df():
out_df = pd.DataFrame()
corr_factor = 0.1
for path, folders, files in scandir.walk(root):
for fldr in folders:
for log_fl in glob.glob( os.path.join( path, fldr, '*.csv') ):
df = pd.read_csv( log_fl, header=None, names=fieldnames )
lft = df[['Left', 'Angle']]
ctr = df[['Center', 'Angle']]
rt = df[['Right', 'Angle']]
lft['Angle'] += corr_factor
rt['Angle'] -= corr_factor
ctr.columns = ['img_pth', 'angle']
lft.columns = ['img_pth', 'angle']
rt.columns = ['img_pth', 'angle']
out_df = out_df.append( ctr ).append( lft ).append( rt )
return out_df
def get_data( split_frac = 0.2 ):
train_frac = (1. - split_frac) ** 2
val_frac = (1. - split_frac) * split_frac
test_frac = split_frac
df = merge_csvs_into_df()
n_data = df.shape[0]
ntrain = math.floor( train_frac * n_data )
nval = math.floor( val_frac * n_data )
ntest = math.floor( test_frac * n_data )
df_train = df.iloc[:ntrain]
df_val = df.iloc[ntrain:ntrain+nval]
df_test = df.iloc[ntrain+nval:]
return df_train, df_val, df_test
class DataSequence(keras.utils.Sequence):
def __init__( self, df, batch_size = batch_size ):
self.X = df.img_pth.values
self.y = df.angle.values
self.batch_size = batch_size
def __len__(self):
return math.ceil( 1. * len( self.X ) / self.batch_size )
def __getitem__(self, idx):
batch_X = self.X[ idx * self.batch_size : (idx + 1) * self.batch_size ]
batch_y = self.y[ idx * self.batch_size : (idx + 1) * self.batch_size ]
X = np.array([ img_to_array( load_img( f, target_size=im_shape ) ) \
for f in batch_X ])
y = np.array(batch_y)
return X, y
class ThreadsafeIterator(object):
def __init__(self, it):
self.it = it
self.lock = threading.Lock()
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next( self.it )
def threadsafe_generator( f ):
def g(*a, **kw):
return ThreadsafeIterator( f(*a, **kw) )
return g
@threadsafe_generator
def generator_from_df( df, batch_size, shuffle = True ):
nbatches, n_skipped_per_epoch = divmod( df.shape[0], batch_size )
epoch = 0
while True:
if shuffle:
# shuffle each epoch; use "tricky" `pandas.sample()` way
df = df.sample( frac = 1 ) # frac=1 is same as shuffling df
epoch += 1
i, j = 0, batch_size
mini_batches_completed = 0
for _ in range( nbatches ):
sub = df.iloc[i:j]
X = np.array( [ img_to_array( load_img( f, target_size=im_shape ) ) \
for f in sub.img_pth.values ] )
y = sub.angle.values
mini_batches_completed += 1
yield X, y
i = j
j += batch_size
def main(*args, **kargs):
""" Behavioral Cloning Program
"""
model = get_model()
df_train, df_val, _ = get_data()
ntrain, nval = df_train.shape[0], df_val.shape[0]
train_sequence = DataSequence( df_train )
val_generator = generator_from_df( df_val, batch_size, im_shape )
nbatches_train, mod_train = divmod( ntrain, batch_size )
nbatches_val, mod_val = divmod( nval, batch_size )
if mod_train > 0: nbatches_train += 1
if mod_val > 0: nbatches_val += 1
nworkers = mp.cpu_count()-1 or 1
model.fit_generator(
generator = train_sequence,
steps_per_epoch = nbatches_train,
epochs = num_epochs,
validation_data = val_generator,
validation_steps = nbatches_val,
workers = nworkers,
use_multiprocessing = True )
model.save('model.h5')
if __name__ == '__main__':
""" Entry point to the program
"""
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
main()
Question
After python successfully starts all my threads, I get the following error. Can someone please provide solution? Any help would be appreciated.
Error
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\multipro
cessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\utils\data_utils.py", line 391, in get_index
return _SHARED_SEQUENCES[uid][i]
KeyError: 0
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\utils\data_utils.py", line 551, in get
inputs = self.queue.get(block=True).get()
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\multipro
cessing\pool.py", line 644, in get
raise self._value
KeyError: 0
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "behavioral_cloning_project_5.py", line 278, in <module>
main()
File "behavioral_cloning_project_5.py", line 270, in main
use_multiprocessing = True )
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\legacy\interfaces.py", line 87, in wrapper
return func(*args, **kwargs)
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\models.py", line 1227, in fit_generator
initial_epoch=initial_epoch)
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\legacy\interfaces.py", line 87, in wrapper
return func(*args, **kwargs)
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\engine\training.py", line 2115, in fit_generator
generator_output = next(output_generator)
File "C:\Users\adam.hendry\AppData\Local\Programs\Python\Python35\lib\site-pac
kages\keras\utils\data_utils.py", line 557, in get
six.raise_from(StopIteration(e), e)
File "<string>", line 3, in raise_from
StopIteration: 0
The same error happened with me, fixed it by using int
instead of math.ceil
in the __len__
function. to make sure it's gonna work you can debug it by doing the following
seq = DataSequence(""" pass parameters """)
l = len(seq)
last = seq[l-1] # try this now and if it gives you an error switch from math.ceil to int
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With