Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transform node AppliedPTransform was not replaced as expected error with the DirectRunner when mixing Streaming and non steaming source in Apache Beam

When I declare a pipeline with two sources (1 gcs and 1 pubsub) I get an error but only with the Beam DirectRunner. With the Google dataflow runner it works well. My pipeline has the "Streaming option = True"

gcsEventsColl = p | "Read from GCS" >> beam.io.ReadFromText("gs://sample_events_for_beam/*.log") \
                  | 'convert to dict' >> beam.Map(lambda x: json.loads(x))
liveEventsColl = p | "Read from Pubsub" >> beam.io.ReadFromPubSub(topic="projects/axxxx/topics/input_topic") \
                   | 'convert to dict2' >> beam.Map(lambda x: json.loads(x))


input_rec = (gcsEventsColl, liveEventsColl) | 'flatten' >> beam.Flatten()

It seems that the DirectRunner makes some incompatible transformations for the ReadFromText but I don't get it.

   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 564, in run
    return self.runner.run_pipeline(self, self._options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 131, in run_pipeline
    return runner.run_pipeline(pipeline, options)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 529, in run_pipeline
    pipeline.replace_all(_get_transform_overrides(options))
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 504, in replace_all
    self._check_replacement(override)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 478, in _check_replacement
    self.visit(ReplacementValidator())
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 611, in visit
    self._root_transform().visit(visitor, self, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1195, in visit
    part.visit(visitor, pipeline, visited)   [Previous line repeated 4 more times]
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 1198, in visit
    visitor.visit_transform(self)
   File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 476, in visit_transform
    transform_node) RuntimeError: Transform node AppliedPTransform(Read from GCS/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
    _GroupByKeyOnly) was not replaced as expected.

I suppose that it is related with this code but I'm not sure: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/direct_runner.py#L375

Thanks for your Help

like image 491
Christophe Rodriguez Avatar asked Nov 08 '25 11:11

Christophe Rodriguez


1 Answers

This is an internal failure due to a bug. This error message means that the Python DirectRunner has corrupted the pipeline graph when it was trying to do a rewrite of the transforms.

like image 58
Kenn Knowles Avatar answered Nov 10 '25 02:11

Kenn Knowles