Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TypeError: can't pickle _thread.RLock objects

After checking all the existing answers on Stackoverflow here: Checkpointing keras model: TypeError: can't pickle _thread.lock objects and here: TypeError: can't pickle _thread.lock objects, I haven't found out why this won't work or what's wrong in my case.

I am using Python 3. Here is my model building functions:

def upsample_and_concat(x1,x2,output_channels,in_channels,layer):
  pool_size = 2
  deconv_filter = tf.Variable(tf.truncated_normal([pool_size, pool_size, output_channels, in_channels], stddev=0.02))
  deconvtf=tf.nn.conv2d_transpose(x1, deconv_filter, tf.shape(x2), strides=[1, pool_size, pool_size, 1])
  deconv_output = tf.concat([deconvtf, x2], 3)
  deconv_output.set_shape([None, None, None, output_channels * 2])
  return deconv_output

def Depth_to_space_tf(input):
  return tf.depth_to_space(input, 2)
  
def build_model():
  inputs=keras.layers.Input(shape=(None, None, 4))
  
  conv1=keras.layers.Conv2D(filters=256,kernel_size=(3,3),strides=(1,1),padding='same',name='conv1_1')(inputs)
  conv1=keras.layers.LeakyReLU(alpha=0.2,name='conv1_1_relu')(conv1)
  conv1=keras.layers.Conv2D(filters=256,kernel_size=(3,3),strides=(1,1),padding='same',name='conv1_2')(conv1)
  conv1=keras.layers.LeakyReLU(alpha=0.2,name='conv1_2_relu')(conv1)
  pool1=keras.layers.MaxPooling2D(pool_size=(2,2),padding="same",name='pool1')(conv1)

  conv2=keras.layers.Conv2D(filters=512,kernel_size=(3,3),strides=(1,1),padding='same',name='conv2_1')(pool1)
  conv2=keras.layers.LeakyReLU(alpha=0.2,name='conv2_1_relu')(conv2)
  conv2=keras.layers.Conv2D(filters=512,kernel_size=(3,3),strides=(1,1),padding='same',name='conv2_2')(conv2)
  conv2=keras.layers.LeakyReLU(alpha=0.2,name='conv2_2_relu')(conv2)

  up6=keras.layers.core.Lambda(upsample_and_concat,arguments={'x2':conv1,'output_channels':256,'in_channels':512,'layer':'upsample_concat_1'},name='upsample_concat_1')(conv2)
  conv6=keras.layers.Conv2D(filters=256,kernel_size=(3,3),strides=(1,1),padding='same',name='conv6_1')(up6)
  conv6=keras.layers.LeakyReLU(alpha=0.2,name='conv6_1_relu')(conv6)
  conv6=keras.layers.Conv2D(filters=256,kernel_size=(3,3),strides=(1,1),padding='same',name='conv6_2')(conv6)
  conv6=keras.layers.LeakyReLU(alpha=0.2,name='conv6_2_relu')(conv6)
  
  conv7=keras.layers.Conv2D(filters=12,kernel_size=(1,1),strides=(1,1),name='conv10')(conv6)

  predictions = keras.layers.core.Lambda(Depth_to_space_tf,name='depth_to_space')(conv7)
  model = keras.models.Model(inputs=inputs, outputs=predictions)
  
  return model

Here is my data loader code:

class DataLoader(keras.utils.Sequence):
  
  def __init__(self,params,data):
    self.epochs=params.epochs
    self.input_dir=params.input_dir
    self.gt_dir=params.gt_dir
    self.train_ids=data.train_ids
    self.shuffled_ids=np.random.permutation(len(self.train_ids))
    for index, val in np.ndenumerate(self.shuffled_ids):
      print ('index:{}, image:{}'.format(index[0], val))
    self.epoch_counter=0
    self.ps=params.ps
    
  def on_epoch_end(self):
    self.shuffled_ids=np.random.permutation(len(self.train_ids))
    print("in on epoch end")

  def __len__(self):
    'Denotes the number of batches per epoch'
    return len(self.train_ids)

  def __getitem__(self, ind):
    'Generates data containing batch_size samples'
    train_id = self.train_ids[self.shuffled_ids[ind]]
    in_files = glob.glob(self.input_dir + '%05d_00*.ARW' % train_id)
    in_path = in_files[np.random.randint(0, len(in_files))]
    in_fn = os.path.basename(in_path)

    gt_files = glob.glob(self.gt_dir + '%05d_00*.ARW' % train_id)
    gt_path = gt_files[0]
    gt_fn = os.path.basename(gt_path)
    in_exposure = float(in_fn[9:-5])
    gt_exposure = float(gt_fn[9:-5])
    ratio = min(gt_exposure / in_exposure, 300)

    st = time.time()
    raw = rawpy.imread(in_path)
    input_images = np.expand_dims(pack_raw(raw), axis=0) * ratio
    gt_raw = rawpy.imread(gt_path)
    im = gt_raw.postprocess(use_camera_wb=True,
                  half_size=False,
                  no_auto_bright=True, output_bps=16)
    gt_images = np.expand_dims(np.float32(im / 65535.0),axis=0) 

    H = input_images.shape[1] 
    W = input_images.shape[2]

    xx = np.random.randint(0, W - self.ps)
    yy = np.random.randint(0, H - self.ps)
    input_patch = input_images[:, yy:yy + self.ps, xx:xx + self.ps, :]
    gt_patch = gt_images[:, yy * 2:yy * 2 + self.ps * 2, xx * 2:xx * 2 + self.ps * 2, :]

    if np.random.randint(2) == 1:
      input_patch = np.flip(input_patch, axis=1)
      gt_patch = np.flip(gt_patch, axis=1)
    if np.random.randint(2) == 1: 
      input_patch = np.flip(input_patch, axis=2)
      gt_patch = np.flip(gt_patch, axis=2)
    if np.random.randint(2) == 1: 
      input_patch = np.transpose(input_patch, (0, 2, 1, 3))
      gt_patch = np.transpose(gt_patch, (0, 2, 1, 3))\

    input_patch = np.minimum(input_patch, 1.0)

    return (input_patch,gt_patch)

when using the instance of the dataloader class in model.fit_generator with

callbacks = [
    ModelCheckpoint(filepath=save_fname, monitor='loss', verbose=1, save_best_only=True)
]

I get the below error exactly when the model tries to save the checkpoint to the file:

TypeError: can't pickle _thread.RLock objects

Full stack trace:

    ---------------------------------------------------------------------------
    TypeError                                 Traceback (most recent call last)
    <ipython-input-52-fde8b84b2b2d> in <module>()
          1 """ ================ TRAIN THE MODEL ================ """
          2 steps_per_epoch=1
    ----> 3 model_history=train_generator(model,dataloader,params.epochs,steps_per_epoch,save_fname)

    32 frames
    <ipython-input-48-2a9e71ef3338> in train_generator(model, data_gen, epochs, steps_per_epoch, save_fname)
         11       max_queue_size=20,
         12       workers=7,
    ---> 13       use_multiprocessing=True
         14   )
         15   print("Model Training completed!!! Jay Yogeshwar!!!")

    /usr/local/lib/python3.6/dist-packages/keras/legacy/interfaces.py in wrapper(*args, **kwargs)
         89                 warnings.warn('Update your `' + object_name + '` call to the ' +
         90                               'Keras 2 API: ' + signature, stacklevel=2)
    ---> 91             return func(*args, **kwargs)
         92         wrapper._original_function = func
         93         return wrapper

    /usr/local/lib/python3.6/dist-packages/keras/engine/training.py in fit_generator(self, generator, steps_per_epoch, epochs, verbose, callbacks, validation_data, validation_steps, class_weight, max_queue_size, workers, use_multiprocessing, shuffle, initial_epoch)
       1416             use_multiprocessing=use_multiprocessing,
       1417             shuffle=shuffle,
    -> 1418             initial_epoch=initial_epoch)
       1419 
       1420     @interfaces.legacy_generator_methods_support

    /usr/local/lib/python3.6/dist-packages/keras/engine/training_generator.py in fit_generator(model, generator, steps_per_epoch, epochs, verbose, callbacks, validation_data, validation_steps, class_weight, max_queue_size, workers, use_multiprocessing, shuffle, initial_epoch)
        249                     break
        250 
    --> 251             callbacks.on_epoch_end(epoch, epoch_logs)
        252             epoch += 1
        253             if callback_model.stop_training:

    /usr/local/lib/python3.6/dist-packages/keras/callbacks.py in on_epoch_end(self, epoch, logs)
         77         logs = logs or {}
         78         for callback in self.callbacks:
    ---> 79             callback.on_epoch_end(epoch, logs)
         80 
         81     def on_batch_begin(self, batch, logs=None):

    /usr/local/lib/python3.6/dist-packages/keras/callbacks.py in on_epoch_end(self, epoch, logs)
        444                             self.model.save_weights(filepath, overwrite=True)
        445                         else:
    --> 446                             self.model.save(filepath, overwrite=True)
        447                     else:
        448                         if self.verbose > 0:

    /usr/local/lib/python3.6/dist-packages/keras/engine/network.py in save(self, filepath, overwrite, include_optimizer)
       1088             raise NotImplementedError
       1089         from ..models import save_model
    -> 1090         save_model(self, filepath, overwrite, include_optimizer)
       1091 
       1092     def save_weights(self, filepath, overwrite=True):

    /usr/local/lib/python3.6/dist-packages/keras/engine/saving.py in save_model(model, filepath, overwrite, include_optimizer)
        380 
        381     try:
    --> 382         _serialize_model(model, f, include_optimizer)
        383     finally:
        384         if opened_new_file:

    /usr/local/lib/python3.6/dist-packages/keras/engine/saving.py in _serialize_model(model, f, include_optimizer)
         81     model_config = {}
         82     model_config['class_name'] = model.__class__.__name__
    ---> 83     model_config['config'] = model.get_config()
         84     model_config = json.dumps(model_config, default=get_json_type)
         85     model_config = model_config.encode('utf-8')

    /usr/local/lib/python3.6/dist-packages/keras/engine/network.py in get_config(self)
        929             model_outputs.append([layer.name, new_node_index, tensor_index])
        930         config['output_layers'] = model_outputs
    --> 931         return copy.deepcopy(config)
        932 
        933     @classmethod

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_list(x, memo, deepcopy)
        213     append = y.append
        214     for a in x:
    --> 215         append(deepcopy(a, memo))
        216     return y
        217 d[list] = _deepcopy_list

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        178                     y = x
        179                 else:
    --> 180                     y = _reconstruct(x, memo, *rv)
        181 
        182     # If is its own copy, don't memoize.

    /usr/lib/python3.6/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
        278     if state is not None:
        279         if deep:
    --> 280             state = deepcopy(state, memo)
        281         if hasattr(y, '__setstate__'):
        282             y.__setstate__(state)

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        178                     y = x
        179                 else:
    --> 180                     y = _reconstruct(x, memo, *rv)
        181 
        182     # If is its own copy, don't memoize.

    /usr/lib/python3.6/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
        278     if state is not None:
        279         if deep:
    --> 280             state = deepcopy(state, memo)
        281         if hasattr(y, '__setstate__'):
        282             y.__setstate__(state)

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        178                     y = x
        179                 else:
    --> 180                     y = _reconstruct(x, memo, *rv)
        181 
        182     # If is its own copy, don't memoize.

    /usr/lib/python3.6/copy.py in _reconstruct(x, memo, func, args, state, listiter, dictiter, deepcopy)
        278     if state is not None:
        279         if deep:
    --> 280             state = deepcopy(state, memo)
        281         if hasattr(y, '__setstate__'):
        282             y.__setstate__(state)

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        148     copier = _deepcopy_dispatch.get(cls)
        149     if copier:
    --> 150         y = copier(x, memo)
        151     else:
        152         try:

    /usr/lib/python3.6/copy.py in _deepcopy_dict(x, memo, deepcopy)
        238     memo[id(x)] = y
        239     for key, value in x.items():
    --> 240         y[deepcopy(key, memo)] = deepcopy(value, memo)
        241     return y
        242 d[dict] = _deepcopy_dict

    /usr/lib/python3.6/copy.py in deepcopy(x, memo, _nil)
        167                     reductor = getattr(x, "__reduce_ex__", None)
        168                     if reductor:
    --> 169                         rv = reductor(4)
        170                     else:
        171                         reductor = getattr(x, "__reduce__", None)

    TypeError: can't pickle _thread.RLock objects

I am new to python and keras and not able to figure out what's wrong here. As far as other explanations, it's failing to serialize something while saving the model. But I am not able to understand what is failing to serialize here.

like image 777
KCK Avatar asked Jul 27 '19 15:07

KCK


1 Answers

I think TFer2 got it right. The issue is that TensorFlow models are not natively serializable by pickling. Somewhere (I think it's in your callback) deepcopy is being called on a Model. To confirm this, you can try to apply this hotfix, or an improved version here.

like image 137
LoveToCode Avatar answered Oct 23 '22 07:10

LoveToCode