Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply TensorFlow Transform to transform/scale features in production

Overview

I followed the following guide to write TF Records, where I used tf.Transform to preprocess my features. Now, I would like to deploy my model, for which I need apply this preprocessing function on real live data.

My Approach

First, suppose I have 2 features:

features = ['amount', 'age']

I have the transform_fn from the Apache Beam, residing in working_dir=gs://path-to-transform-fn/

Then I load the transform function using:

tf_transform_output = tft.TFTransformOutput(working_dir)

I thought that the easiest way to serve in in production was to get a numpy array of processed data, and call model.predict() (I am using Keras model).

To do this, I thought transform_raw_features() method is exactly what I need.

However, it seems that after building the schema:

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

I get:

AttributeError: 'Tensor' object has no attribute 'indices'

Now, I am assuming this happens because I used tf.VarLenFeature() when I defined schema in my preprocessing_fn.

def preprocessing_fn(inputs):
    outputs = inputs.copy()

    for _ in features:
        outputs[_] = tft.scale_to_z_score(outputs[_])

And I build the metadata using:

RAW_DATA_FEATURE_SPEC = {}
for _ in features:
    RAW_DATA_FEATURE_SPEC[_] = tf.VarLenFeature(dtype=tf.float32)
    RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(
    dataset_schema.from_feature_spec(RAW_DATA_FEATURE_SPEC))

So in short, given a dictionary:

d = {'amount': [50], 'age': [32]}, I would like to apply this transform_fn, and scale these values appropriately to input into my model for prediction. This dictionary is exactly the format of my PCollection before the data is processed by the pre_processing() function.

Pipeline Structure:

class BeamProccess():

def __init__(self):

    # init 

    self.run()


def run(self):

    def preprocessing_fn(inputs):

         # outputs = { 'id' : [list], 'amount': [list], 'age': [list] }
         return outputs

    with beam.Pipeline(options=self.pipe_opt) as p:
        with beam_impl.Context(temp_dir=self.google_cloud_options.temp_location):
            data = p | "read_table" >> beam.io.Read(table_bq) \
            | "create_data" >> beam.ParDo(ProcessFn())

            transformed_dataset, transform_fn = (
                        (train, RAW_DATA_METADATA) | beam_impl.AnalyzeAndTransformDataset(
                    preprocessing_fn))

            transformed_data, transformed_metadata = transformed_dataset

            transformed_data | "WriteTrainTFRecords" >> tfrecordio.WriteToTFRecord(
                    file_path_prefix=self.JOB_DIR + '/train/data',
                    file_name_suffix='.tfrecord',
                    coder=example_proto_coder.ExampleProtoCoder(transformed_metadata.schema))

            _ = (
                        transform_fn
                        | 'WriteTransformFn' >>
                        transform_fn_io.WriteTransformFn(path=self.JOB_DIR + '/transform/'))

And finally the ParDo() is:

class ProcessFn(beam.DoFn):

    def process(self, element):

        yield { 'id' : [list], 'amount': [list], 'age': [list] }
like image 306
user10430178 Avatar asked Jan 07 '19 20:01

user10430178


People also ask

Why is tf transformation needed?

tf. Transform is a library for TensorFlow that allows users to define preprocessing pipelines and run these using large scale data processing frameworks, while also exporting the pipeline in a way that can be run as part of a TensorFlow graph.

What is tf transformation?

TensorFlow Transform is a library for preprocessing input data for TensorFlow, including creating features that require a full pass over the training dataset. For example, using TensorFlow Transform you could: Normalize an input value by using the mean and standard deviation.

Which are the three main methods of getting data into a TensorFlow program?

Feeding: Python code provides the data when running each step. Reading from files: an input pipeline reads the data from files at the beginning of a TensorFlow graph. Preloaded data: a constant or variable in the TensorFlow graph holds all the data (for small data sets).


1 Answers

The problem is with the snippet

raw_features = {}
for k in features:
    raw_features.update({k: tf.constant(1)})

print(tf_transform_output.transform_raw_features(raw_features))

In this code you construct a dictionary where the values are tensors. Like you said, this won't work for a VarLenFeature. Instead of using tf.constant try using tf.placeholder for a a FixedLenFeature and tf.sparse_placeholder for a VarLenFeature.

like image 178
Kester Tong Avatar answered Sep 17 '22 19:09

Kester Tong