Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Beam pipeline does not produce any output after GroupByKey with windowing [duplicate]

I am ingesting data via pub/sub to a dataflow pipeline which is running in unbounded mode. The data are basically coordinates with timestamps captured from tracking devices. Those messages arrive in batches, where each batch might be 1..n messages. For a certain period there might be no messages arriving, which might be resent later on (or not). We use the time-stamp (in UTC) of each coordinate as an attribute for the pub-sub message. And read the pipeline via a Timestamp label:

pipeline.apply(PubsubIO.Read.topic("new").timestampLabel("timestamp")

An example of coordinates and delay looks like:

36 points wait 0:02:24
36 points wait 0:02:55
18 points wait 0:00:45
05 points wait 0:00:01
36 points wait 0:00:33
36 points wait 0:00:43
36 points wait 0:00:34

A message might look like:

2013-07-07 09:34:11;47.798766;13.050133

After the first batch the Watermark is empty, after the second batch I can see a Watermark in the Pipeline diagnostics, just it doesn't get updated, although new messages arrive. Also according to stackdriver logging PubSub has no undelivered or unacknowledged messages.

Shouldn't the watermark move forward as messages with new event time arrive?

According to What is the watermark heuristic for PubsubIO running on GCD? the WaterMark should also move forward every 2minutes which it doesn't?

[..] In the case that we have not seen data on the subscription in more than two minutes (and there's no backlog), we advance the watermark to near real time. [..]

Update to address Bens questions:

Is there a job ID that we could look at?

Yes I just restarted the whole setup at 09:52 CET which is 07:52 UTC, with job ID 2017-05-05_00_49_11-11176509843641901704.

What version of the SDK are you using?

1.9.0

How are you publishing the messages with the timestamp labels?

We use a python script to publish the data which is using the pub sub sdk. A message from there might look like:

{'data': {timestamp;lat;long;ele}, 'timestamp': '2017-05-05T07:45:51Z'}

We use the timestamp attribute for the timestamplabel in dataflow.

What is the watermark stuck at?

For this job the watermark is now stuck at 09:57:35 (I am posting this around 10:10), although new data is sent e.g. at

10:05:14
10:05:43
10:06:30

I can also see that it may happen that we publish data to pub sub with delay of more than 10 seconds e.g. at 10:07:47 we publish data with a highest timestamp of 10:07:26.

After a few hours the watermark catches up but I cannot see why it is delayed /not moving in the beginning.

like image 578
Michael Lessiak Avatar asked Nov 17 '22 10:11

Michael Lessiak


2 Answers

This is an edge-case in the PubSub watermark tracking logic that has two work arounds (see below). Essentially, if there is no input for 2 minutes, then the watermark will advance to the current time. But, if data is arriving faster than every 2 minutes but still at a very low QPS, then there isn't enough data to have a keep the estimated watermark up to date.

As I mentioned, there are several work arounds:

  1. If you process more data the issue will naturally be resolved.
  2. Alternatively, if you inject extra messages (say 2 per second) it will provide enough data for the watermark to advance more quickly. These just need to have timestamps, and may be immediately filtered out of the pipeline.
like image 85
Ben Chambers Avatar answered Dec 20 '22 21:12

Ben Chambers


For the record, another thing to have in mind about the previously mentioned edge cases in a direct runner context, is the parallelism of the runner. Having a higher parallelism, which is default especially on multicore machines, seems to need even more data. In my case a setting --targetParallelism=1 helped. Basically transformed a stuck pipeline to in a working one without any other intervention.

like image 45
Adrian Avatar answered Dec 20 '22 20:12

Adrian