Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What does reshuffling, in the context of exactly-once processing in BigQuery sink, mean?

I'm reading an article on exactly-once processing implemented by some Dataflow sources and sinks and I'm having troubles understanding the example on BigQuery sink. From the article

Generating a random UUID is a non-deterministic operation, so we must add a reshuffle before we insert into BigQuery. Once that is done, any retries by Cloud Dataflow will always use the same UUID that was shuffled. Duplicate attempts to insert into BigQuery will always have the same insert id, so BigQuery is able to filter them

// Apply a unique identifier to each record
c
 .apply(new DoFn<> {
  @ProcessElement
  public void processElement(ProcessContext context) {
   String uniqueId = UUID.randomUUID().toString();
   context.output(KV.of(ThreadLocalRandom.current().nextInt(0, 50),
                                     new RecordWithId(context.element(), uniqueId)));
 }
})
// Reshuffle the data so that the applied identifiers are stable and will not change.
.apply(Reshuffle.of<Integer, RecordWithId>of())
// Stream records into BigQuery with unique ids for deduplication.
.apply(ParDo.of(new DoFn<..> {
   @ProcessElement
   public void processElement(ProcessContext context) {
     insertIntoBigQuery(context.element().record(), context.element.id());
   }
 });

What does reshuffle mean and how can it prevent generation of different UUID for the same insert on subsequent retries ?

like image 563
MassyB Avatar asked Sep 26 '18 14:09

MassyB


People also ask

What is the concept of PCollection in GC dataflow?

PCollection. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is DoFn in Apache Beam?

DoFn is a Beam SDK class that describes a distributed processing function.

Which of these accurately describes the relationship between Apache Beam and cloud dataflow?

Which of these accurately describes the relationship between Apache Beam and Cloud Dataflow? Apache Beam is the API for data pipeline building in java or python and Cloud Dataflow is the implementation and execution framework.

What is PCollection?

PCollection : A PCollection represents a distributed data set that your Beam pipeline operates on. The data set can be bounded, meaning it comes from a fixed source like a file, or unbounded, meaning it comes from a continuously updating source via a subscription or other mechanism.


2 Answers

Reshuffle groups the data in a different way. However, here it is used for its side-effects: checkpointing and deduplication.

Without reshuffle, if the same task generates UUID and inserts data to BigQuery, there is a risk the worker restarts and the new worker would generate a new UUID and sends different row to BigQuery, resulting in duplicate rows.

Reshuffle operation splits UUID generation and BigQuery insert into two steps, and inserts checkpointing and deduplication between them.

  1. First, UUID are generated and sent to reshuffle. If UUID generation worker is restarted, it is OK, as reshuffle deduplicates rows, eliminating data from failed / restarted workers.
  2. The generated UUIDs are checkpointed by the shuffle operation.
  3. BigQuery insert worker uses checkpointed UUIDs, so even if it is restarted - it sends exactly the same data to BigQuery.
  4. BigQuery deduplicates data using these UUIDs, so duplicates from restarted insert worker are eliminated in BigQuery.
like image 84
Michael Entin Avatar answered Oct 21 '22 18:10

Michael Entin


I think the article provides a good explanation on why "reshuffle" helps moving from "at least once" to "exactly once":

Specifically, the window might attempt to fire with element e0, e1, e2, but the worker crashes before committing the window processing (but not before those elements are sent as a side effect). When the worker restarts the window will fire again, but now a late element e3 shows up. Since this element shows up before the window is committed, it’s not counted as late data, so the DoFn is called again with elements e0, e1, e2, e3. These are then sent to the side-effect operation. Idempotency does not help here, as different logical record sets were sent each time.

There are other ways non-determinism can be introduced. The standard way to address this risk is to rely on the fact that Cloud Dataflow currently guarantees that only one version of a DoFn's output can make it past a shuffle boundary.

You can also check Reshuffle's docs:

  • https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/transforms/Reshuffle.html

There's a note there about deprecating this class, so future implementations of BigQueryIO might differ.

like image 2
Felipe Hoffa Avatar answered Oct 21 '22 16:10

Felipe Hoffa