Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the end of window timestamp in Apache Beam Python

I'm creating sliding time windows 20 seconds long every 5 seconds from batched log data:

    rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))

    # set timestamp field used for windowing and set 20 second long window every 5 seconds
    ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
                    | 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))

    # extract only user id and relevant data, group and process
    rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row: 
                                [(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
                              | 'group by user id' >> beam.GroupByKey()
                              | 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))

How can I access the timestamp information for each window in Python? (An answer for Java is here but I don't know how to translate it into Python: How to get the max timestamp of the current sliding window) Ideally I'd want the end time of each window rather than the max or min timestamp of the data within the window.

like image 304
Mike Keyes Avatar asked Sep 15 '17 13:09

Mike Keyes


People also ask

What is DoFn in Apache Beam?

DoFn is a Beam SDK class that describes a distributed processing function.

What is ParDo and DoFn?

ParDo is the computational pattern of per-element computation. It has some variations, but you don't need to worry about that for this question. The DoFn , here I called it fn , is the logic that is applied to each element.

What is PTransform in Apache Beam?

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

What is PCollection in Apache Beam?

Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.


1 Answers

I went to the link you provided.

Note: window=beam.DoFn.WindowParam is the parameter which is mentioned on the page you linked.

The window end time is beam.DoFn.WindowParam.end. In Python, you can access it in like this:

Define your DoFn:

class BuildRecordFn(beam.DoFn):
def __init__(self):
    super(BuildAdsRecordFn, self).__init__()

def process(self, element,  window=beam.DoFn.WindowParam):
    #window_start = window.start.to_utc_datetime()
    window_end = window.end.to_utc_datetime()
    return [element + (window_end,)]

Then use it like this:

    lines = p | ReadFromText(known_args.input)
    counts = (
        lines
        | 'ParseEventFn' >> beam.ParDo(ParseEventFn())

        | 'AddEventTimestamp' >> beam.Map(
            lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))

        | 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))

        | 'FixedWindow' >> beam.WindowInto(
            beam.window.FixedWindows(60*1))

        | 'PairWithOne' >> beam.Map(lambda x: (x, 1))

        | 'GroupAndSum' >> beam.CombinePerKey(sum)

        | 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))

        | 'Format' >> beam.Map(format_result)

        | WriteToText(known_args.output) 


    def format_result(xs):
        ys = [str(x) for x in xs]
        return ','.join(ys)
like image 60
x97Core Avatar answered Oct 06 '22 00:10

x97Core