As the documentation is only available for JAVA, I could not really understand what it means.
It states - "While ParDo always produces a main output PCollection (as the return value from apply), you can also have your ParDo produce any number of additional output PCollections. If you choose to have multiple outputs, your ParDo will return all of the output PCollections (including the main output) bundled together. For example, in Java, the output PCollections are bundled in a type-safe PCollectionTuple."
I understand what bundled together means, but if i am yielding a tag in my DoFn, does it yields with a bundle with all other outputs empty on the go and yield other outputs when they are encountered in code? or it waits for all yields to be ready for a input and the outputs them all together in a bundle?
There isnt much clarity around it in the documentation. Although i think it doesnt wait and just yields when encountered, but I still need understand what is happening.
In Apache Beam, Side input allows us to provide additional inputs to ParDo transforms. Means, In addition to the main input Beam PCollection , we can provide additional inputs to a ParDo transform in the form of side inputs. DoFn can access this side input each time it processes an element in the input PCollection .
ParDo is the core parallel processing operation in the Apache Beam SDKs, invoking a user-specified function on each of the elements of the input PCollection . ParDo collects the zero or more output elements into an output PCollection . The ParDo transform processes elements independently and possibly in parallel.
A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.
There is no way to check size of the PCollection without applying a PTransform on it (such as Count. globally() or Combine. combineFn()) because PCollection is not like a typical Collection in Java SDK or so.
The best way to answer this is with an example. This example is available in Beam.
Suppose that you want to run a word count pipeline (e.g. count the number of times each word appears in a document). For this you need to split lines in a file into individual words. Consider that you also want to count word lengths individually. Your splitting transform would be like so:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input) # Read in the file
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
short_words = split_lines_result['words']
character_count = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
In this case, each is a different PCollection
, with the right elements. The DoFn
would be in charge of splitting its outputs, and it does it by tagging elements. See:
class SplitLinesToWordsFn(beam.DoFn):
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
# yield word to add it to the main collection.
yield word
As you can see, for the main output, you do not need to tag the elements, but for the other outputs you do.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With