I am new to the project, and I am trying to create a connector between Dataflow and a database.
The documentation clearly states that I should use a Source and a Sink but I see a lot of people using directly a PTransform associated with a PInput or a PDone.
The source/sink API is in experimental (which explaines all the examples with the PTransform), but seems more easy to integrate it with a custom runner (ie: spark for example).
If I refer to the code, the two methods are used. I cannot see any use case where it will be more interesting to use the PTransform API.
Is the Source/Sink API is supposed to remplace the PTranform API?
Did I miss something that clearly differentiate the two methods?
Is the Source/Sink API stable enough to be considered the good way to code inputs and outputs?
Thank for you advice!
A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.
PTransform: A PTransform is an operation that needs to be performed on a single data element. It takes an input PCollection and transforms it into zero or more output PCollections.
ParDo is the computational pattern of per-element computation. It has some variations, but you don't need to worry about that for this question. The DoFn , here I called it fn , is the logic that is applied to each element.
Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections.
The philosophy of Dataflow is that PTransform is the main unit of abstraction and composability, i.e., any self-contained data processing task should be encapsulated as a PTransform. This includes the task of connecting to a third-party storage system: ingesting data from somewhere or exporting it to somewhere.
Take, for example, Google Cloud Datastore. In the code snippet:
    PCollection<Entity> entities =
      p.apply(DatastoreIO.readFrom(dataset, query));
    ...
    p.apply(some processing)
     .apply(DatastoreIO.writeTo(dataset));
the return type of DatastoreIO.readFrom(dataset, query) is a subclass of PTransform<PBegin, PCollection<Entity>>, and the type of DatastoreIO.writeTo(dataset) is a subclass of PTransform<PCollection<Entity>, PDone>.
It is true that these functions are under the hood implemented using the Source and Sink classes, but to a user who just wants to read or write something to Datastore, that's an implementation detail that usually should not matter (however, see the note at the end of this answer about exposing the Source or Sink class). Any connector, or for that matter, any other data processing task is a PTransform.
Note: Currently connectors that read from somewhere tend to be PTransform<PBegin, PCollection<T>>, and connectors that write to somewhere tend to be PTransform<PCollection<T>, PDone>, but we are considering options to make it easier to use connectors in more flexible ways (for example, reading from a PCollection of filenames).
However, of course, this detail matters to somebody who wants to implement a new connector. In particular, you may ask:
Q: Why do I need the Source and Sink classes at all, if I could just implement my connector as a PTransform?
A: If you can implement your connector by just using the built-in transforms (such as ParDo, GroupByKey etc.), that's a perfectly valid way to develop a connector. However, the Source and Sink classes provide some low-level capabilities that, in case you need them, would be cumbersome or impossible to develop yourself.
For example, BoundedSource and UnboundedSource provide hooks for controlling how parallelization happens (both initial and dynamic work rebalancing - BoundedSource.splitIntoBundles, BoundedReader.splitAtFraction), while these hooks are not currently exposed for arbitrary DoFns.
You could technically implement a parser for a file format by writing a DoFn<FilePath, SomeRecord> that takes the filename as input, reads the file and emits SomeRecord, but this DoFn would not be able to dynamically parallelize reading parts of the file onto multiple workers in case the file turned out to be very large at runtime. On the other hand, FileBasedSource has this capability built-in, as well as handling of glob filepatterns and such.
Likewise, you could try implementing a connector to a streaming system by implementing a DoFn that takes a dummy element as input, establishes a connection and streams all elements into ProcessingContext.output(), but DoFns currently don't support writing unbounded amounts of output from a single bundle, nor do they explicitly support the checkpointing and deduplication machinery needed for the strong consistency guarantees Dataflow gives to streaming pipelines. UnboundedSource, on the other hand, supports all this.
Sink (more precisely, the Write.to() PTransform) is also interesting: it is just a composite transform that you could write yourself if you wanted to (i.e. it has no hard-coded support in the Dataflow runner or backend), but it was developed with consideration for typical distributed fault tolerance issues that arise when writing data to a storage system in parallel, and it provides hooks that force you to keep those issues in mind: e.g., because bundles of data are written in parallel, and some bundles may be retried or duplicated for fault tolerance, there is a hook for "committing" just the results of the successfully completed bundles (WriteOperation.finalize).
To summarize: using Source or Sink APIs to develop a connector helps you structure your code in a way that will work well in a distributed processing setting, and the source APIs give you access to advanced capabilities of the framework. But if your connector is a very simple one that needs neither, then you are free to just assemble your connector from other built-in transforms.
Q: Suppose I decide to make use of Source and Sink. Then how do I package my connector as a library: should I just provide the Source or Sink class, or should I wrap it into a PTransform?
A: Your connector should ultimately be packaged as a PTransform, so that the user can just p.apply() it in their pipeline. However, under the hood your transform can use Source and Sink classes.
A common pattern is to expose the Source and Sink classes as well, making use of the Fluent Builder pattern, and letting the user wrap them into a Read.from() or Write.to() transform themselves, but this is not a strict requirement.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With