Hi I'm trying to run a pipeline where I am calculating diffs between messages that are published to pubsub with 30sec heartbeats* (10K streams, each heartbeating every 30sec). I don't care about 100% data completeness, but I'd like to understand what the watermark heuristic is for PubsubIO (and if I can tweak it), to determine whether I can ignore late data with sufficiently low loss.
*Note, the pubsub topic provides [potentially days worth of] persistence in case we have to take down the pipeline so it's important that the heuristic work well with a backlogged subscription.
Can someone explain how the watermark is calculated (assuming timestamplabel() is used), and how it can be adjusted, if at all?
Here's a quick description for how we compute the PubSub watermark:
Our goal is to build a reasonable heuristic watermark for data sent into our streaming pipeline via PubSub. We make some assumptions about the source which is sending data into PubSub. Specifically, we assume that the timestamps of the original data are “well behaved,” in other words, we expect a bounded amount of our-of order timestamps on the source data, before it is sent to PubSub. Any data that is sent with timestamps outside the allowed out-of-order bounds will be considered late data. In our current implementation this bound is 10 seconds meaning reordering of timestamps up to 10 seconds before sending to pubsub will not create late data. We call this value the estimation band. The problem of building a PubSub watermark then reduces to ensuring that no additional data becomes late due to transmission via PubSub.
What are the challenges we face with PubSub? Since pubsub does not guarantee ordering, we must have some kind of additional metadata to know enough about the backlog. Luckily, PubSub provides a measurement of back-log in terms of the “oldest unacked publish timestamp”. This is not the same as the event timestamp of our message, since PubSub is agnostic to the application-level metadata being sent through it, instead this is the timestamp of when the message was ingested by PubSub.
While this measurement sounds similar to a watermark, it is not the same. We cannot simply use the oldest unacked publish timestamp as the watermark. These timestamps are not equal to the event timestamps, and in the case that historical (past) data is being sent, it may be arbitrarily far away. The ordering on these timestamps may also be different, since as mentioned above we allow a limited ammount of re-ordering. However, we can use this as a measure of backlog to learn enough information about the event timestamps present in the backlog so that we can establish a reasonable watermark as follows.
We call the subscription on which data is arriving the base subscription. Taking a look at our base subscription, we see that messages may arrive out of order. We label each message with its pubsub publish timestamp “pt” and its event time timestamp “et”. Note that the two time domains can be unrelated
Some messages on the base subscription are un-acknowledged forming a backlog. This may be due to them not yet being delivered, or they may have been delivered, but not yet processed. Remember also that pulls from this subscription are distributed across multiple shards. Thus it is not possible to say just by looking at the base subscription what our watermark should be.
We proceed by creating a second metadata-only tracking subscription, which is used to effectively inspect the backlog of the base subscription, and take the minimum of the event timestamps in the backlog. By maintaining little or no backlog on the tracking subscription we can inspect the messages ahead of the base subsciption’s oldest unak’d.
We stay caught up on the tracking subscription by ensuring that pulling from this subscription is computationally inexpensive. Conversely, if we fall sufficiently behind on the tracking subscription, we will stop advancing the watermark. To do so, we ensure that at least one of the following conditions is met:
We ack the messages on the tracking subscription as soon as possible, once we have durably persisted metadata about the publish and event timestamps of the messages. We store this metadata in a sparse histogram format to minimize the amount of space used, and the size of the durable writes.
Finally, we ensure that we have enough data to make a reasonable watermark estimate. We take a band of event timestamps with publish timestamps in the range
[ min ( base sub oldest unack'd, tracking sub oldest unack'd - 10 sec) ,
tracking sub oldest unack'd ]
This ensures that we consider all event timestamps in the backlog, or if the backlog is small, the most recent estimation band, to make a watermark estimate.
Finally the watermark value is computed to be the minimum event time in the band.
Observe also that this method is correct, but produces an overly conservative watermark. Since we consider all messages ahead of the base subscription’s oldest unak’d on the tracking subscription, we may include event timestamps in the watermark estimate for messages that have already been acknowledged.
Additionally, there are a few heuristics to ensure progress. The above method works well in the case of dense, frequently arriving data. In the case of sparse or infrequent data, there may not be enough recent messages to build a reasonable estimate. In the case that we have not seen data on the subscription in more than two minutes (and there's no backlog), we advance the watermark to near real time. This ensures that the watermark and the pipeline continue to make progress even if no more messages are forthcoming.
All of the above ensures that as long as source data event timestamp re-ordering is within the estimation band, there will be no additional late data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With