Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Upgrade to tf.dataset not working properly when parsing csv

I have a GCMLE experiment and I am trying to upgrade my input_fn to use the new tf.data functionality. I have created the following input_fn based off of this sample

def input_fn(...):
    dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards) # shuffle up the list of input files
    dataset = dataset.interleave(lambda filename: # mix together records from cycle_length number of shards
                tf.data.TextLineDataset(filename).skip(1).map(lambda row: parse_csv(row, hparams)), cycle_length=5) 
    if shuffle:
      dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    dataset = dataset.batch(batch_size)
    iterator = dataset.make_one_shot_iterator()
    features = iterator.get_next()

    labels = features.pop(LABEL_COLUMN)

    return features, labels

my parse_csv is the same as what I used previously, but it is not currently working. I can fix some of the issues, but I don't fully understand why I am having these issues. Here is the start of my parse_csv() function

def parse_csv(..):
    columns = tf.decode_csv(rows, record_defaults=CSV_COLUMN_DEFAULTS)
    raw_features = dict(zip(FIELDNAMES, columns))

    words = tf.string_split(raw_features['sentences']) # splitting words
    vocab_table = tf.contrib.lookup.index_table_from_file(vocabulary_file = hparams.vocab_file,
                default_value = 0)

....
  1. Right away this tf.string_split() stops working and the error is ValueError: Shape must be rank 1 but is rank 0 for 'csv_preprocessing/input_sequence_generation/StringSplit' (op: 'StringSplit') with input shapes: [], []. -- this is easily solved by packing raw_features['sentences'] into a tensor via [raw_features['sentences']] but I do not understand why this is needed with the this dataset approach? How come in the old version this worked fine? For the shapes to match up with the rest of my model, I end up needing to remove this extra dimension at the end via words = tf.squeeze(words, 0) because I add this "unecessary" dimension to the tensor.

  2. For whatever reason, I am also getting an error that the table is not initialized tensorflow.python.framework.errors_impl.FailedPreconditionError: Table not initialized. however, this code works completely fine with my old input_fn() (see below) so I don't know why I would now need to initialize the tables? I have not figured out a solution to this part. Is there anything that I am missing to be able to use tf.contrib.lookup.index_table_from_file within my parse_csv function?

For reference, this is my old input_fn() that still does work:

def input_fn(...):
    filename_queue = tf.train.string_input_producer(tf.train.match_filenames_once(filenames), 
                num_epochs=num_epochs, shuffle=shuffle, capacity=32)
    reader = tf.TextLineReader(skip_header_lines=skip_header_lines)

    _, rows = reader.read_up_to(filename_queue, num_records=batch_size)

    features = parse_csv(rows, hparams)


        if shuffle:
            features = tf.train.shuffle_batch(
                features,
                batch_size,
                min_after_dequeue=2 * batch_size + 1,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(), 
                enqueue_many=True,
                allow_smaller_final_batch=True
            )
        else:
            features = tf.train.batch(
                features,
                batch_size,
                capacity=batch_size * 10,
                num_threads=multiprocessing.cpu_count(),
                enqueue_many=True,
                allow_smaller_final_batch=True
            )

labels = features.pop(LABEL_COLUMN)

return features, labels

UPDATE TF 1.7

I am revisiting this with TF 1.7 (which should have all of the TF 1.6 features mentioned in @mrry answer) but I'm still unable to replicate the behavior. For my old input_fn() I am able to gete around 13 steps/sec. The new function that I am using is as follows:

def input_fn(...):
    files = tf.data.Dataset.list_files(filenames).shuffle(num_shards)
    dataset = files.apply(tf.contrib.data.parallel_interleave(lambda filename: tf.data.TextLineDataset(filename).skip(1), cycle_length=num_shards))
    dataset = dataset.apply(tf.contrib.data.map_and_batch(lambda row:
            parse_csv_dataset(row, hparams = hparams), 
            batch_size = batch_size, 
            num_parallel_batches = multiprocessing.cpu_count())) 
    dataset = dataset.prefetch(1)
    if shuffle:
        dataset = dataset.shuffle(buffer_size = 10000)
    dataset = dataset.repeat(num_epochs)
    iterator = dataset.make_initializable_iterator()
    features = iterator.get_next()
    tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer)

    labels = {key: features.pop(key) for key in LABEL_COLUMNS}

    return features, labels 

I believe that I am following all of the performance guildines such as 1) use prefetch 2) use map_and_batch with num_parallel_batches = cores 3) use parallel_interleave 4) applying shuffle before the repeat. The only steps I am not using is the cache suggestion, but would expect that to really only help for epochs beyond the first one as well as "applying interleave, prefetch and shuffle first." -- however I found that having prefetch and shuffle after the map_and_batch was ~10% speedup.

BUFFER ISSUE The first performance issue that I am noticing is with my old input_fn() it took me about 13 wall clock minutes to get through 20k steps, and yet even with the buffer_size of 10,000 (which I take to mean we are waiting until we have 10,000 batches processed) I am still waiting more than 40 minutes for the buffer to get full . Does it make sense to take this long? If I know that my sharded .csv's on GCS are already randomized, is it acceptable to have this shuffle/buffer size smaller? I am trying to replicate the behavior from tf.train.shuffle_batch() -- however, it seems that at worst it should take the same 13 mins that it took to reach 10k steps in order to fill up the buffer?

STEPS/SEC

Even once the buffer has filled up, the global steps/sec tops out around 3 steps/sec (often as low as 2 steps/sec) on the same model with the previous input_fn() that is getting ~13 steps/sec.

SLOPPY INTERLEAVE I finall tried to replace parallel_interleave() with sloppy_interleave() as this is another suggestion from @mrry. When I switched to sloppy_interleave I got 14 steps/sec! I know this means that it is not deterministic, but that should really just mean it is not deterministic from one run (or epoch) to the next? Or are there larger implications for this? Should I be concerned about any real difference between the old shuffle_batch() method and sloppy_interleave? Does the fact that this results in a 4-5x improvement suggest what the previous blocking factor was?

like image 432
reese0106 Avatar asked Feb 14 '18 03:02

reese0106


1 Answers

In TF 1.4 (which is currently the latest version of TF that works with GCMLE) you will not be able to use make_one_shot_iterator() with the lookup tables (see relevant post) you will need to use Dataset.make_initializable_iterator() and then initialize iterator.initalizer with your default TABLES_INITIALIZER (from this post). Here is what the input_fn() should look like:

def input_fn(...):
  dataset = tf.data.Dataset.list_files(filenames).shuffle(num_shards)

  # Define `vocab_table` outside the map function and use it in `parse_csv()`.
  vocab_table = tf.contrib.lookup.index_table_from_file(
      vocabulary_file=hparams.vocab_file, default_value=0)

  dataset = dataset.interleave(
      lambda filename: (tf.data.TextLineDataset(filename)
                        .skip(1)
                        .map(lambda row: parse_csv(row, hparams),
                             num_parallel_calls=multiprocessing.cpu_count())),
      cycle_length=5) 

  if shuffle:
    dataset = dataset.shuffle(buffer_size=10000)
  dataset = dataset.repeat(num_epochs)
  dataset = dataset.batch(batch_size)
  iterator = dataset.make_initializable_iterator()
  features = iterator.get_next()

  # add iterator.intializer to be handled by default table initializers
  tf.add_to_collection(tf.GraphKeys.TABLE_INITIALIZERS, iterator.initializer) 

  labels = features.pop(LABEL_COLUMN)

  return features, labels
like image 150
reese0106 Avatar answered Oct 09 '22 21:10

reese0106