I'm relatively new to the world of TensorFlow, and pretty perplexed by how you'd actually read CSV data into a usable example/label tensors in TensorFlow. The example from the TensorFlow tutorial on reading CSV data is pretty fragmented and only gets you part of the way to being able to train on CSV data.
Here's my code that I've pieced together, based off that CSV tutorial:
from __future__ import print_function import tensorflow as tf def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 filename = "csv_test_data.csv" # setup text reader file_length = file_len(filename) filename_queue = tf.train.string_input_producer([filename]) reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) # setup CSV decoding record_defaults = [[0],[0],[0],[0],[0]] col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults) # turn features back into a tensor features = tf.stack([col1,col2,col3,col4]) print("loading, " + str(file_length) + " line(s)\n") with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(file_length): # retrieve a single instance example, label = sess.run([features, col5]) print(example, label) coord.request_stop() coord.join(threads) print("\ndone loading")
And here is an brief example from the CSV file I'm loading - pretty basic data - 4 feature columns, and 1 label column:
0,0,0,0,0 0,15,0,0,0 0,30,0,0,0 0,45,0,0,0
All the code above does is print each example from the CSV file, one by one, which, while nice, is pretty darn useless for training.
What I'm struggling with here is how you'd actually turn those individual examples, loaded one-by-one, into a training dataset. For example, here's a notebook I was working on in the Udacity Deep Learning course. I basically want to take the CSV data I'm loading, and plop it into something like train_dataset and train_labels:
def reformat(dataset, labels): dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32) # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...] labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32) return dataset, labels train_dataset, train_labels = reformat(train_dataset, train_labels) valid_dataset, valid_labels = reformat(valid_dataset, valid_labels) test_dataset, test_labels = reformat(test_dataset, test_labels) print('Training set', train_dataset.shape, train_labels.shape) print('Validation set', valid_dataset.shape, valid_labels.shape) print('Test set', test_dataset.shape, test_labels.shape)
I've tried using tf.train.shuffle_batch
, like this, but it just inexplicably hangs:
for i in range(file_length): # retrieve a single instance example, label = sess.run([features, colRelevant]) example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000) print(example, label)
So to sum up, here are my questions:
for i in range(file_length)
line of code above)Edit: As soon as Yaroslav pointed out that I was likely mixing up imperative and graph-construction parts here, it started to become clearer. I was able to pull together the following code, which I think is closer to what would typically done when training a model from CSV (excluding any model training code):
from __future__ import print_function import numpy as np import tensorflow as tf import math as math import argparse parser = argparse.ArgumentParser() parser.add_argument('dataset') args = parser.parse_args() def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 def read_from_csv(filename_queue): reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) record_defaults = [[0],[0],[0],[0],[0]] colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults) features = tf.stack([colHour,colQuarter,colAction,colUser]) label = tf.stack([colLabel]) return features, label def input_pipeline(batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True) example, label = read_from_csv(filename_queue) min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch file_length = file_len(args.dataset) - 1 examples, labels = input_pipeline(file_length, 1) with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) try: while not coord.should_stop(): example_batch, label_batch = sess.run([examples, labels]) print(example_batch) except tf.errors.OutOfRangeError: print('Done training, epoch reached') finally: coord.request_stop() coord.join(threads)
With Python Standard Library, you will be using the module CSV and the function reader() to load your CSV files. Upon loading, the CSV data will be automatically converted to NumPy array which can be used for machine learning.
# Make numpy values easier to read. For any small CSV dataset the simplest way to train a TensorFlow model on it is to load it into memory as a pandas Dataframe or a NumPy array. A relatively simple example is the abalone dataset. The dataset is small. All the input features are all limited-range floating point values.
Now we have to create our own dataset using the give csv format data.To create dataset we will use tf.data.experimental.make_csv_dataset. We can also use pandas dataframe to create numpy array and then passing those array to Tensorflow but the only disadvantage here is that it cannot handle large datasets.
The raw data can easily be loaded as a Pandas DataFrame, but is not immediately usable as input to a TensorFlow model. Because of the different data types and ranges, you can't simply stack the features into a NumPy array and pass it to a tf.keras.Sequential model. Each column needs to be handled individually.
Note: The tf.data.experimental.snapshot files are meant for temporary storage of a dataset while in use. This is not a format for long term storage. The file format is considered an internal detail, and not guaranteed between TensorFlow versions.
I think you are mixing up imperative and graph-construction parts here. The operation tf.train.shuffle_batch
creates a new queue node, and a single node can be used to process the entire dataset. So I think you are hanging because you created a bunch of shuffle_batch
queues in your for loop and didn't start queue runners for them.
Normal input pipeline usage looks like this:
shuffle_batch
to input pipeline--- end of graph construction, beginning of imperative programming --
tf.start_queue_runners
while(True): session.run()
To be more scalable (to avoid Python GIL), you could generate all of your data using TensorFlow pipeline. However, if performance is not critical, you can hook up a numpy array to an input pipeline by using slice_input_producer.
Here's an example with some Print
nodes to see what's going on (messages in Print
go to stdout when node is run)
tf.reset_default_graph() num_examples = 5 num_features = 2 data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features)) print data (data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False) data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ") data_batch = tf.batch([data_node_debug], batch_size=2) data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ") sess = tf.InteractiveSession() sess.run(tf.initialize_all_variables()) tf.get_default_graph().finalize() tf.start_queue_runners() try: while True: print sess.run(data_batch_debug) except tf.errors.OutOfRangeError as e: print "No more inputs."
You should see something like this
[[0 1] [2 3] [4 5] [6 7] [8 9]] [[0 1] [2 3]] [[4 5] [6 7]] No more inputs.
The "8, 9" numbers didn't fill up the full batch, so they didn't get produced. Also tf.Print
are printed to sys.stdout, so they show up in separately in Terminal for me.
PS: a minimal of connecting batch
to a manually initialized queue is in github issue 2193
Also, for debugging purposes you might want to set timeout
on your session so that your IPython notebook doesn't hang on empty queue dequeues. I use this helper function for my sessions
def create_session(): config = tf.ConfigProto(log_device_placement=True) config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM config.operation_timeout_in_ms=60000 # terminate on long hangs # create interactive session to register a default session sess = tf.InteractiveSession("", config=config) return sess
Scalability Notes:
tf.constant
inlines copy of your data into the Graph. There's a fundamental limit of 2GB on size of Graph definition so that's an upper limit on size of datav=tf.Variable
and saving the data into there by running v.assign_op
with a tf.placeholder
on right-hand side and feeding numpy array to the placeholder (feed_dict
)slice_input_producer
which operates on numpy arrays, and uploads rows one at a time using feed_dict
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With