Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Google dataflow streaming pipeline is not distributing workload over several workers after windowing

I'm trying to set up a dataflow streaming pipeline in python. I have quite some experience with batch pipelines. Our basic architecture looks like this: enter image description here

The first step is doing some basic processing and takes about 2 seconds per message to get to the windowing. We are using sliding windows of 3 seconds and 3 second interval (might change later so we have overlapping windows). As last step we have the SOG prediction that takes about 15ish seconds to process and which is clearly our bottleneck transform.

So, The issue we seem to face is that the workload is perfectly distributed over our workers before the windowing, but the most important transform is not distributed at all. All the windows are processed one at a time seemingly on 1 worker, while we have 50 available.

The logs show us that the sog prediction step has an output once every 15ish seconds which should not be the case if the windows would be processed over more workers, so this builds up huge latency over time which we don't want. With 1 minute of messages, we have a latency of 5 minutes for the last window. When distribution would work, this should only be around 15sec (the SOG prediction time). So at this point we are clueless..

enter image description here

Does anyone see if there is something wrong with our code or how to prevent/circumvent this? It seems like this is something happening in the internals of google cloud dataflow. Does this also occur in java streaming pipelines?

In batch mode, Everything works fine. There, one could try to do a reshuffle to make sure no fusion etc occurs. But that is not possible after windowing in streaming.

args = parse_arguments(sys.argv if argv is None else argv)
pipeline_options = get_pipeline_options(project=args.project_id,
                                        job_name='XX',
                                        num_workers=args.workers,
                                        max_num_workers=MAX_NUM_WORKERS,
                                        disk_size_gb=DISK_SIZE_GB,
                                        local=args.local,
                                        streaming=args.streaming)

pipeline = beam.Pipeline(options=pipeline_options)

# Build pipeline
# pylint: disable=C0330
if args.streaming:
    frames = (pipeline | 'ReadFromPubsub' >> beam.io.ReadFromPubSub(
        subscription=SUBSCRIPTION_PATH,
        with_attributes=True,
        timestamp_attribute='timestamp'
    ))

    frame_tpl = frames | 'CreateFrameTuples' >> beam.Map(
        create_frame_tuples_fn)

crops = frame_tpl | 'MakeCrops' >> beam.Map(make_crops_fn, NR_CROPS)
bboxs = crops | 'bounding boxes tfserv' >> beam.Map(
    pred_bbox_tfserv_fn, SERVER_URL)

sliding_windows = bboxs | 'Window' >> beam.WindowInto(
                  beam.window.SlidingWindows(
                        FEATURE_WINDOWS['goal']['window_size'],
                        FEATURE_WINDOWS['goal']['window_interval']),
                  trigger=AfterCount(30),
                  accumulation_mode=AccumulationMode.DISCARDING)

# GROUPBYKEY (per match)
group_per_match = sliding_windows | 'Group' >> beam.GroupByKey()
_ = group_per_match | 'LogPerMatch' >> beam.Map(lambda x: logging.info(
    "window per match per timewindow: # %s, %s", str(len(x[1])), x[1][0][
        'timestamp']))

sog = sliding_windows | 'Predict SOG' >> beam.Map(predict_sog_fn,
                                                SERVER_URL_INCEPTION,
                                                SERVER_URL_SOG )

pipeline.run().wait_until_finish()
like image 628
Brecht Coghe Avatar asked Feb 19 '19 10:02

Brecht Coghe


People also ask

What is the purpose of the windowing model in Google Cloud Dataflow?

Windows and windowing functions. Windowing functions divide unbounded collections into logical components, or windows. Windowing functions group unbounded collections by the timestamps of the individual elements. Each window contains a finite number of elements.

Is it possible to share data across pipeline instances?

Is it possible to share data across pipeline instances? There is no Dataflow-specific cross pipeline communication mechanism for sharing data or processing context between pipelines. You can use durable storage like Cloud Storage or an in-memory cache like App Engine to share data between pipeline instances.

How does Autoscaling work in Dataflow?

With Horizontal Autoscaling enabled, the Dataflow service automatically chooses the appropriate number of worker instances required to run your job. The Dataflow service may also dynamically re-allocate more workers or fewer workers during runtime to account for the characteristics of your job.

What are the benefits of Dataflow streaming engine select all that apply?

Benefits of Dataflow Shuffle Faster execution time of batch pipelines for the majority of pipeline job types. A reduction in consumed CPU, memory, and Persistent Disk storage resources on the worker VMs. Better autoscaling since VMs no longer hold any shuffle data and can therefore be scaled down earlier.


2 Answers

In beam the unit of parallelism is the key--all the windows for a given key will be produced on the same machine. However, if you have 50+ keys they should get distributed among all workers.

You mentioned that you were unable to add a Reshuffle in streaming. This should be possible; if you're getting errors please file a bug at https://issues.apache.org/jira/projects/BEAM/issues . Does re-windowing into GlobalWindows make the issue with reshuffling go away?

like image 98
robertwb Avatar answered Oct 18 '22 03:10

robertwb


It looks like you do not necessarily need GroupByKey because you are always grouping on the same key. Instead you could maybe use CombineGlobally to append all the elements inside the window in stead of the GroupByKey (with always the same key).

combined = values | beam.CombineGlobally(append_fn).without_defaults()
combined | beam.ParDo(PostProcessFn())

I am not sure how the load distribution works when using CombineGlobally but since it does not process key,value pairs I would expect another mechanism to do the load distribution.

like image 32
Sven.DG Avatar answered Oct 18 '22 03:10

Sven.DG