Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merge two PCollection (Apache beam)

I've two files in cloud storage.Contains of File1 in Avro format that has data from temperature sensor.

time_stamp     |  Temperature
1000           |  T1
2000           |  T2
3000           |  T3
4000           |  T3
5000           |  T4
6000           |  T5

Contains of File2 in Avro format that has data from wind sensor.

time_stamp     |  wind_speed
500            |  w1
1200           |  w2
1500           |  w3
2200           |  w4
2500           |  w5
3000           |  w6

I'want to combine output like below

time_stamp |Temperature|wind_speed
1000       |T1         |w1 (last earliest reading from wind sensor at 500)
2000       |T2         |w3 (last earliest reading from wind sensor at 1500)
3000       |T3         |w6 (wind sensor reading at 3000)
4000       |T3         |w6 (last earliest reading from wind sensor at 3000)
5000       |T4         |w6 (last earliest reading from wind sensor at 3000)
6000       |T5         |w6(last earliest reading from wind sensor at 3000)

I am looking for the solution in apache beam to combine above file. Right now it is reading from file but in future it may come via pubsub. I want to find out custom way of combining two PCollection and create another PCollection tempDataWithWindSpeed.

     PCollection<Temperature> tempData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/temp-sensor-data.avro")

     PCollection<WindSpeed> windData = p.apply(AvroIO
         .read(AvroAutoGenClass.class)
         .from("gs://my_bucket/path/to/wind-sensor-data.avro")

     PCollection<WindSpeed> tempDataWithWindSpeed = ?
like image 293
pinakin Avatar asked Sep 01 '25 09:09

pinakin


1 Answers

The comment by @jszule is a good answer in general for Dataflow/Beam: The best supported join is when the two PCollections have a common key. For most data Beam can figure out a schema and you can use CoGroup.join transform. The design decision you have to make is how to choose the keys, such as rounding down to the nearest 1000.

Your use case has a complication: you need to carry forward values in a time series for keys that have no data. The solution is to use state and timers to generate the "missing" values. You will still need to carefully choose keys since state and timers are per-key-and-window. State and timers also work in batch mode so this is a batch/streaming unified solution.

You may want to read this blog post by Reza Rokni and myself on the subject, or this talk by Reza at the Beam Summit Berlin 2019

like image 167
Kenn Knowles Avatar answered Sep 04 '25 06:09

Kenn Knowles