Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to combine streaming data with large history data set in Dataflow/Beam

I am investigating processing logs from web user sessions via Google Dataflow/Apache Beam and need to combine the user's logs as they come in (streaming) with the history of a user's session from the last month.

I have looked at the following approaches:

  1. Use a 30 day fixed window: most likely to large of a window to fit into memory, and I do not need to update the user's history, just refer to it
  2. Use CoGroupByKey to join two data sets, but the two data sets must have the same window size (https://cloud.google.com/dataflow/model/group-by-key#join), which isn't true in my case (24h vs 30 days)
  3. Use Side Input to retrieve the user's session history for a given element in processElement(ProcessContext processContext)

My understanding is that the data loaded via .withSideInputs(pCollectionView) needs to fit into memory. I know I can fit all of a single user's session history into memory, but not all session histories.

My question is if there is a way to load/stream data from a side input that is only relevant to the current user session?

I am imagining a parDo function that would load the user's history session from the side input by specifying the user's ID. But only the current user's history session would fit into memory; loading all history sessions through side input would be too large.

Some pseudo code to illustrate:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}
like image 442
Florian Avatar asked Apr 29 '16 00:04

Florian


People also ask

Does dataflow process batch data pipelines or streaming data pipelines?

Dataflow is a fully managed service to execute pipelines within the Google Cloud Platform ecosystem. It is a service that is fully dedicated to transforming and enriching data in stream (real-time) and batch (historical) modes.

When should I use cloud dataflow?

Google Cloud Dataflow is a cloud-based data processing service for both batch and real-time data streaming applications. It enables developers to set up processing pipelines for integrating, preparing and analyzing large data sets, such as those found in Web analytics or big data analytics applications.

What is beam in big data?

Apache Beam is an open source, unified model for defining both batch- and streaming-data parallel-processing pipelines. The Apache Beam programming model simplifies the mechanics of large-scale data processing. Using one of the Apache Beam SDKs, you build a program that defines the pipeline.

What does cloud dataflow use to support fast and simplified pipeline development?

Google Cloud Dataflow always supports fast simplified pipeline through an expressive SQL, Java, and Python APIs in the Apache Beam SDK. Google Cloud Dataflow allows us to integrate its service with Stackdriver, which lets us monitor and troubleshoot pipelines as they are running.


1 Answers

There is not currently a way of accessing per-key side inputs in streaming, but it would definitely be useful exactly as you describe, and it is something we are considering implementing.

One possible workaround is to use the side inputs to distribute pointers to the actual session history. The code generating the 24h session histories could upload them to GCS/BigQuery/etc, then send the locations as a side input to the joining code.

like image 66
danielm Avatar answered Oct 03 '22 11:10

danielm