I'm creating sliding time windows 20 seconds long every 5 seconds from batched log data:
rows = p | 'read events' >> beam.io.Read(beam.io.BigQuerySource(query=query))
# set timestamp field used for windowing and set 20 second long window every 5 seconds
ts_rows = (rows | 'set timestamp' >> beam.ParDo(AddTimestampDoFn())
| 'set window' >> beam.WindowInto(window.SlidingWindows(20,5)))
# extract only user id and relevant data, group and process
rows_with_data = (ts_rows | 'extract data' >> beam.FlatMap(lambda row:
[(str(row['user_id']),[row['data1'], row['data2'],row['data3']])])
| 'group by user id' >> beam.GroupByKey()
| 'Process window' >> beam.ParDo(WindowDataProcessingDoFn()))
How can I access the timestamp information for each window in Python? (An answer for Java is here but I don't know how to translate it into Python: How to get the max timestamp of the current sliding window) Ideally I'd want the end time of each window rather than the max or min timestamp of the data within the window.
DoFn is a Beam SDK class that describes a distributed processing function.
ParDo is the computational pattern of per-element computation. It has some variations, but you don't need to worry about that for this question. The DoFn , here I called it fn , is the logic that is applied to each element.
A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.
Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.
I went to the link you provided.
Note: window=beam.DoFn.WindowParam
is the parameter which is mentioned on the page you linked.
The window end time is beam.DoFn.WindowParam.end
. In Python, you can access it in like this:
Define your DoFn:
class BuildRecordFn(beam.DoFn):
def __init__(self):
super(BuildAdsRecordFn, self).__init__()
def process(self, element, window=beam.DoFn.WindowParam):
#window_start = window.start.to_utc_datetime()
window_end = window.end.to_utc_datetime()
return [element + (window_end,)]
Then use it like this:
lines = p | ReadFromText(known_args.input)
counts = (
lines
| 'ParseEventFn' >> beam.ParDo(ParseEventFn())
| 'AddEventTimestamp' >> beam.Map(
lambda elem: beam.window.TimestampedValue(elem, elem['timestamp']))
| 'ExtractObjectID' >> beam.Map(lambda elem: (elem["objectID"]))
| 'FixedWindow' >> beam.WindowInto(
beam.window.FixedWindows(60*1))
| 'PairWithOne' >> beam.Map(lambda x: (x, 1))
| 'GroupAndSum' >> beam.CombinePerKey(sum)
| 'AddWindowEndTimestamp'(beam.ParDo(BuildRecordFn()))
| 'Format' >> beam.Map(format_result)
| WriteToText(known_args.output)
def format_result(xs):
ys = [str(x) for x in xs]
return ','.join(ys)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With