Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can I use Flink state to perform join?

I am evaluating Apache Flink for stream processing as a replacement/complement of Apache Spark. One of the tasks we are usually solving with Spark is data enrichment.

I.e, I have stream of data from IoT sensors with sensor ID and I have set of sensors metadata. I want to transform input stream to stream of sensor measure+sensor metadata.

In Spark I can join DStream with RDD.

case calss SensorValue(sensorId: Long, ...)
case class SensorMetadata(sensorId: Long, ...)
val sensorInput: DStream[SensorValue] = readEventsFromKafka()
val staticMetadata: RDD[(Long, SensorMetadata)] =
  spark.read.json(...).as[SensorMetadata]
 .map {s => (s.sensorId, s)}.rdd
val joined: DStream[(SensorValue, SensorMetadata)] = 
  sensorInput.map{s => (s.sensorId, s)}.transform { rdd: RDD[SensorValue] => 
  rdd.join(staticMetadata)
     .map { case (_, (s, m)) => (s, m) } // Get rid of nested tuple
}

Can I do same trick with Apache Flink? I see no direct API on this. Only idea I have is to use stateful transformation - I can merge metadata and sensor events in a single stream and use Flink state storage to store metadata (pseudocode):

val sensorInput: DataStream[SensorValue] = readEventsFromKafka()
val statisMetadata: DataStream[SensorMetadata] = readMetadataFromJson()
val result: DataStream[(SensorValue, SensorMetadata)] =
  sensorInput.keyBy("sensorId")
 .connect(staticMetadata.keyBy("sensorId"))
 .flatMap {new RichCoFlatMapFunction() {
   private val ValueState<SensorMetadata> md = _;
   override def open = ??? // initiate value state
   def flatMap1(s: SensorEvent, s: Collector(SensorEvent, SensorMetadata)) = 
      collector.collect(s, md.value) 
   def flatMap2(s: SensorMetadata, s: Collector[(SensorEvent, SensorMetadata)]) = 
   md.update(s)  
 }}

Is this correct approach? Can I use under larger scale, when metadata doesn't fit on one machine?

Thanks

like image 793
Yura Taras Avatar asked Oct 18 '16 06:10

Yura Taras


People also ask

How do I combine two streams in Flink?

Join in Action Now run the flink application and also tail the log to see the output. Enter messages in both of these two netcat windows within a window of 30 seconds to join both the streams. The resultant data stream has complete information of an individual-: the id, name, department, and salary.

Is Flink stateful or stateless?

Apache Flink is a distributed stream processor with intuitive and expressive APIs to implement stateful stream processing applications. It efficiently runs such applications at large scale in a fault-tolerant manner.

What can you do with Flink?

Flink is a distributed processing engine and a scalable data analytics framework. You can use Flink to process data streams at a large scale and to deliver real-time analytical insights about your processed data with your streaming application.

Can Flink be used for batch processing?

The BATCH execution mode can only be used for Jobs/Flink Programs that are bounded. Boundedness is a property of a data source that tells us whether all the input coming from that source is known before execution or whether new data will show up, potentially indefinitely.


1 Answers

Using a CoFlatMapFunction to join is a common approach. However, it has one significant drawback. The function is called whenever a tuple of either input arrives and you cannot control which input to consume first. So in the beginning, you will have to handle sensor events when the metadata has not been completely read. One approach is to buffer all events of one input until the other input is consumed. On the other hand, the CoFlatMapFunction approach has the benefit that you can dynamically update the metadata. In your code example, both inputs are keyed on the join key. That means that the input is partitioned and each taskslot is processing a different key set. Hence, your metadata can be larger than what a machine can handle (if you configure the RocksDB state backend the state can be persisted to disk, so you are not even bound by the size of the memory).

If you require that all metadata must be present when the job starts and if the metadata is static (it does not change) and is small enough to fit into one machine, you can also use a regular FlatMapFunction and load the metadata in the open() method from a file. In contrast to your approach, this would be a broadcast join, where each taskslot has the complete metadata in memory. Besides having all metadata available when the event data is consumed, the approach has the benefit that you do not need to shuffle the event data because it can be joined on any machine.

like image 167
Fabian Hueske Avatar answered Oct 10 '22 09:10

Fabian Hueske