First I want to quickly give some background. What I want to achieve eventually is to train a fully connected neural network for a multi-class classification problem under tensorflow framework.
The challenge of the problem is that the size of training data is huge (~ 2 TB). In order for the training to work under limited memory, I want to save training set into small files and use mini-batch gradient descent algorithm to train the model. (Each time only one or a few files are loaded into the memory).
Say now I already have two data frames with processed data, one with X_train (7 million entries * 200 features with column names) and one with training_y (7 million entries * 1 label). How can I efficiently save this into TFrecord files, keeping column names, row index, etc, and I may want to have each file to contain say 100,000 entries? I know that with everything under TFrecord I can utilize some of the neat shuffling and batching functionalities implemented in tensorflow. I probably need a very efficient way to write such records because later on I will need to write 2TB of data into this file format.
I tried to search "How to write pandas data frame to TFRecords" on Google but didn't get any luck on good examples. Most examples ask me to create a tf.train.Example
column by column, row by row and write to tfrecord file using tf.python_io.TFRecordWriter
. Just want to confirm this is best of what I can get here.
If you have other suggestions for the problem I am trying to solve, it will be much appreciated too!
The default pandas data types are not the most memory efficient. This is especially true for text data columns with relatively few unique values (commonly referred to as “low-cardinality” data). By using more efficient data types, you can store larger datasets in memory.
You can check here to write pandas df to tfRecord
install pandas-tfrecords
pip install pandas-tfrecords
Try
import pandas as pd
from pandas_tfrecords import pd2tf, tf2pd
df = pd.DataFrame({'A': [1, 2, 3], 'B': ['a', 'b', 'c'], 'C': [[1, 2], [3, 4], [5, 6]]})
# local
pd2tf(df, './tfrecords')
my_df = tf2pd('./tfrecords')
Hope this will help.
A work around that might work is to export the pandas
dataframe to a parquet file. This is one of the best ways to efficiently store data since the data will be partitioned into some files.
You can even decide which column to use for the partitons so that each unique value of that column will go into one file. More info to_parquet pandas doc.
Then you can do the batch process using those partitions.
Turning a file to TFRecords is (unfortunately) quite involved if you are only using tensorflow
and pandas
. As other answers have given clever ways to avoid this, I will show how to make the conversion using only tensorflow
and pandas
, if only for completeness' sake.
TRIGGER WARNING: lots of TF boilerplate. You have been warned.
import pandas as pd
import tensorflow as tf
#Creating fake data for demonstration
X_train = pd.DataFrame({'feat1':[1,2,3],
'feat2':['one','two','three']})
training_y = pd.DataFrame({'target': [3.4, 11.67, 44444.1]})
X_train.to_csv('X_train.csv')
training_y.to_csv('training_y.csv')
#TFRecords boilerplate
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def serialize_example(index, feat1, feat2, target):
"""
Creates a tf.train.Example message ready to be written to a file.
"""
# Create a dictionary mapping the feature name to the tf.train.Example-compatible
# data type.
feature = {
'index': _int64_feature(index),
'feat1': _int64_feature(feat1),
'feat2': _bytes_feature(feat2),
'target': _float_feature(target)
}
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
#Loading the data into chunks of size 2. Change this to 1e5 in your code
CHUNKSIZE = 2
train = pd.read_csv('X_train.csv', chunksize=CHUNKSIZE)
y = pd.read_csv('training_y.csv', chunksize=CHUNKSIZE)
file_num = 0
while 1:
try:
print(f'{file_num}')
#Getting the data from the two files
df = pd.concat([train.get_chunk(), y.get_chunk()],1)
#Writing the TFRecord
with tf.io.TFRecordWriter(f'Record_{file_num}.tfrec') as writer:
for k in range(df.shape[0]):
row = df.iloc[k,:]
example = serialize_example(
df.index[k],
row['feat1'],
str.encode(row['feat2']), #Note the str.encode to make tf play nice with strings
row['target'])
writer.write(example)
file_num += 1
except:
print(f'ERROR: {sys.exc_info()[0]}')
break
The code above loads the files in chunks using the chunksize
parameter of pandas.read_csv
. If your files are not csv, check if the appropriate pandas read_filetype
has the chunksize
parameter.
In writing this, I leaned heavily on Chris Deotte's How to Create TFRecords kernel. I tried the official documentation, but they would fail to mention things like how to get tf.io to read your pandas string. This made life significantly harder.
If, for whatever reason, you feel the need to check inside the TFRecords to make sure that the data is correct, you will need even more boilerplate. Enjoy.
#Reading the TFRecord
def read_tfrecord(example):
LABELED_TFREC_FORMAT = {
"index": tf.io.FixedLenFeature([], tf.int64),
"feat1": tf.io.FixedLenFeature([], tf.int64),
"feat2": tf.io.FixedLenFeature([], tf.string),
"target": tf.io.FixedLenFeature([], tf.float32)
}
example = tf.io.parse_single_example(example, LABELED_TFREC_FORMAT)
index = example['index']
feat1 = example['feat1']
feat2 = example['feat2']
target = example['target']
return index, feat1, feat2, target
def load_dataset(filenames, labeled=True, ordered=False):
# Read from TFRecords. For optimal performance, reading from multiple files at once and
# disregarding data order. Order does not matter since we will be shuffling the data anyway.
ignore_order = tf.data.Options()
if not ordered:
ignore_order.experimental_deterministic = False # disable order, increase speed
dataset = tf.data.TFRecordDataset(filenames, num_parallel_reads=AUTO) # automatically interleaves reads from multiple files
dataset = dataset.with_options(ignore_order) # uses data as soon as it streams in, rather than in its original order
dataset = dataset.map(read_tfrecord)
# returns a dataset of (image, label) pairs if labeled=True or (image, id) pairs if labeled=False
return dataset
AUTO = tf.data.experimental.AUTOTUNE
def get_training_dataset(filenames, batch_size=2):
dataset = load_dataset(filenames, labeled=True)
dataset = dataset.repeat() # the training dataset must repeat for several epochs
#dataset = dataset.shuffle(2048)
dataset = dataset.batch(batch_size)
dataset = dataset.prefetch(AUTO) # prefetch next batch while training (autotune prefetch buffer size)
return dataset
training_dataset = get_training_dataset(filenames= ['Record_0.tfrec', 'Record_1.tfrec'])
#training_dataset = training_dataset.unbatch().batch(20)
next(iter(training_dataset))
(<tf.Tensor: shape=(2,), dtype=int64, numpy=array([0, 2])>,
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([1, 3])>,
<tf.Tensor: shape=(2,), dtype=int64, numpy=array([11, 33])>,
<tf.Tensor: shape=(2,), dtype=string, numpy=array([b'one', b'three'], dtype=object)>,
<tf.Tensor: shape=(2,), dtype=float32, numpy=array([3.40000e+00, 4.44441e+04], dtype=float32)>)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With