I'm writing an input pipeline using tf.data.Dataset
. I'd like to use python code to load and transform my samples, the code returns a dictionary of tensors. Unfortunately I don't see how I can define that as the output type that is passed to tf.py_func
.
I have a workaround where my function returns list of tensors instead of a dictionary, but it makes my code less readable as I have 4 keys in that dict.
The code looks somehow as follows
file_list = ....
def load(file_name):
return {"image": np.zeros(...,dtype=np.float32),
"label": 1.0} # there is more labels, in the original code
ds = tf.data.Dataset.from_tensor_slices(file_list)
ds.shuffle(...)
out_type = [{'image':tf.float32, "label":tf.float32 }] # ????
ds.map(lambda x: tf.py_func(load, [x], out_type))
ds.batch(...)
ds.prefetch(1)
This answer is in response to Celso Franca's comment.
I did find a way but not returning a dict but rather using tf_example.SerializeToString()
.
The two functions were used for processing BERT input on the fly. It worked greate and saved me many hours of pre-processing upfront, while not losing any performance in the training process.
def _convert(label, text):
"""Decodes a csv-line to a TensorFlow Example, serialized as a string."""
np_label = label.numpy()
np_text = text.numpy()
tokens_a = tokenizer.tokenize(np_text)
# Account for [CLS] and [SEP] with "- 2"
if len(tokens_a) > seq_length - 2:
tokens_a = tokens_a[0: (seq_length - 2)]
tokens = []
segment_ids = []
tokens.append("[CLS]")
segment_ids.append(0)
for token in tokens_a:
tokens.append(token)
segment_ids.append(0)
tokens.append("[SEP]")
segment_ids.append(0)
input_ids = tokenizer.convert_tokens_to_ids(tokens)
# The mask has 1 for real tokens and 0 for padding tokens. Only real
# tokens are attended to.
input_mask = [1] * len(input_ids)
# Zero-pad up to the sequence length.
while len(input_ids) < seq_length:
input_ids.append(0)
input_mask.append(0)
segment_ids.append(0)
assert len(input_ids) == seq_length
assert len(input_mask) == seq_length
assert len(segment_ids) == seq_length
label_id = label_map[np_label]
features = collections.OrderedDict()
features["input_ids"] = create_int_feature(input_ids)
features["input_mask"] = create_int_feature(input_mask)
features["segment_ids"] = create_int_feature(segment_ids)
features["label_ids"] = create_int_feature([label_id])
features["is_real_example"] = create_int_feature([int(True)])
tf_example = tf.train.Example(features=tf.train.Features(feature=features))
# tf.py_function only accepts true tf datatypes like string
return tf_example.SerializeToString()
def _decode_record(record):
"""Decodes a record to a TensorFlow example."""
example = tf.parse_single_example(record, name_to_features)
# tf.Example only supports tf.int64, but the TPU only supports tf.int32.
# So cast all int64 to int32.
for name in list(example.keys()):
t = example[name]
if t.dtype == tf.int64:
t = tf.to_int32(t)
example[name] = t
return example
def input_fn(params):
"""The actual input function."""
filenames = tf.data.Dataset.list_files(file_pattern)
label_col = processor.get_label_col()
text_col = processor.get_text_col()
d = filenames.apply(
tf.contrib.data.parallel_interleave(
lambda filename: tf.data.experimental.CsvDataset(filename,
[tf.float32, tf.string],
select_cols=[label_col, text_col],
field_delim=delimiter,
header=True),
cycle_length=2))
if is_training:
d = d.repeat()
d = d.shuffle(buffer_size=100)
d = d.map(lambda label, text: tf.py_function(_convert, [label, text], tf.string))
d = d.map(_decode_record)
d = d.batch(batch_size=params["batch_size"], drop_remainder=drop_remainder)
return d
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With