I am trying to implement a TensorFlow dataset from a Python generator because I am having problems with my model consuming memory, inevitably resulting in a OOM crash (see my question on that here). So, I am thinking that a generator might be better suited to handle any memory problems.
However, when I try to implement a generator for my model, I get this error: Local rendezvous is aborting with status: INVALID_ARGUMENT: Dataset had more than one element.
Here is my generator code:
def data_loading_generator(
data_matrix: np.typing.NDArray,
data_labels: np.typing.NDArray,
window_length,
dw
):
num_rows = data_matrix.shape[0]
y_onehot = np.stack(
[np.flip(data_labels), data_labels],
axis=1
)
data_segments = segment_data_batch(
data_mat=data_matrix,
w=window_length,
dw=dw
)
for row_number in range(0, num_rows):
yield (
{f"input_{ii}": x[row_number, :] for ii, x in enumerate(data_segments)},
(
{"embedding_layer": data_labels[row_number]},
{"classification": y_onehot[row_number, :]}
)
)
The function segment_data_batch takes in a matrix and outputs a list of overlapping segments from each row of the matrix, length window_length, and overlap window_length - dw. The inputs to the neural net are each labeled as input_{ii} and each input takes a single segment from the list of segments. I have labels for the data for comparison at the embedding layer and the classification layer. I initialize the data loader as shown below:
train_tf_dataset = tf.data.Dataset.from_generator(
data_loading_generator,
args=[X_train, Y_train, w_len, dw],
output_signature=(
{f"input_{ii}": tf.TensorSpec(shape=(w_len,), dtype=tf.float64, name=f"input_{ii}") for ii in range(number_windows)},
(
{"embedding_layer": tf.TensorSpec(shape=(), dtype=tf.int32, name="embedding_layer")},
{"classification": tf.TensorSpec(shape=(2,), dtype=tf.int32, name="classification")}
)
)
)
Here, X_train is an N x M numpy array where each row is a single data point, and Y_train is an N-length numpy vector. When I call train_tf_dataset.take(1), I get the following:
<_TakeDataset element_spec=({'input_0': TensorSpec(shape=(50,), dtype=tf.float64, name='input_0'), 'input_1': TensorSpec(shape=(50,), dtype=tf.float64, name='input_1'), 'input_2': TensorSpec(shape=(50,), dtype=tf.float64, name='input_2'), 'input_3': TensorSpec(shape=(50,), dtype=tf.float64, name='input_3'), 'input_4': TensorSpec(shape=(50,), dtype=tf.float64, name='input_4'), 'input_5': TensorSpec(shape=(50,), dtype=tf.float64, name='input_5'), 'input_6': TensorSpec(shape=(50,), dtype=tf.float64, name='input_6'), 'input_7': TensorSpec(shape=(50,), dtype=tf.float64, name='input_7'), 'input_8': TensorSpec(shape=(50,), dtype=tf.float64, name='input_8'), 'input_9': TensorSpec(shape=(50,), dtype=tf.float64, name='input_9'), 'input_10': TensorSpec(shape=(50,), dtype=tf.float64, name='input_10'), 'input_11': TensorSpec(shape=(50,), dtype=tf.float64, name='input_11'), 'input_12': TensorSpec(shape=(50,), dtype=tf.float64, name='input_12'), 'input_13': TensorSpec(shape=(50,), dtype=tf.float64, name='input_13'), 'input_14': TensorSpec(shape=(50,), dtype=tf.float64, name='input_14'), 'input_15': TensorSpec(shape=(50,), dtype=tf.float64, name='input_15'), 'input_16': TensorSpec(shape=(50,), dtype=tf.float64, name='input_16'), ... }, ({'embedding_layer': TensorSpec(shape=(), dtype=tf.int32, name=None)}, {'classification': TensorSpec(shape=(2,), dtype=tf.int32, name=None)}))>
When I call train_tf_dataset.get_single_element(), I get the error described above, namely:
InvalidArgumentError: {{function_node __wrapped__DatasetToSingleElement_output_types_81_device_/job:localhost/replica:0/task:0/device:CPU:0}} Dataset had more than one element. [Op:DatasetToSingleElement] name:
What am I doing wrong here?
This error comes from how get_single_element() works:
tf.data.Dataset.get_single_element() (and oldertf.data.experimental.get_single_element) expects a dataset that contains exactly one element.
Your generator yields one element per row, so your dataset has N elements (number of rows), hence the error "more than one element".
You can try one of these 3 ways to take one sample:
# Eager-style: first element
example = next(iter(train_tf_dataset))
# Explicitly make it a single-element dataset first
example = tf.data.experimental.get_single_element(train_tf_dataset.take(1))
# if using the method form newer TF:
example = train_tf_dataset.take(1).get_single_element()
EDIT - Additional memory improvements:
Do you really need float64 precision ? it doubles your memory usage compared to float32. And note that Keras layers typically default to float32.
It seems that you are precomputing all segments for all rows inside the generator:
data_segments = segment_data_batch(data_matrix, w=window_length, dw=dw)
for row_number in range(num_rows):
...
This defeats the purpose of a streaming generator, you may want to compute segments per row inside the loop, or better, build your dataset from rows and do the windowing in a .map using tf.signal.frame.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With