Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to look up and update the state of a record from a database in Apache Flink?

I am working on a data streaming application and I am investigating the possibility of using Apache Flink for this project. The main reason for this is that it supports nice high-level streaming constructs, very similar to Java 8's Stream API.

I will be receiving events which correspond to a specific record in a database, and I want to be able to process these events (coming from a message broker such as RabbitMQ or Kafka) and eventually update the records in the database and push the processed/transformed events to another sink (probably another message broker).

Events related to a specific record will ideally need to be processed in FIFO ordering (although there will be a timestamp which helps detect out of order events too), but events related to different records can be processed in parallel. I was planning to use the keyBy() construct to partition the stream by record.

The processing that needs to be done depends on the current information in the database about the record. However, I am unable to find an example or recommended approach to query a database for such records to enrich the event that it is being processed with the additional information I need to process it.

The pipeline I have in mind is as follows:

-> keyBy() on the id received -> retrieve the record from the database corresponding to the id -> perform processing steps on the record -> push the processed event to an external queue and update the database record

The database record will need to be updated because another application will be querying the data.

There might be additional optimisations one could do after this pipeline is achieved. For example one could cache the (updated) record in a managed state so that the next event on the same record will not need another database query. However, if the application does not know about a specific record it will need to retrieve it from the database.

What is the best approach to use for this kind of scenario in Apache Flink?

like image 868
jbx Avatar asked Aug 10 '16 06:08

jbx


People also ask

How do I query Flink state?

To enable queryable state on your Flink cluster, you need to do the following: copy the flink-queryable-state-runtime-1.17-SNAPSHOT. jar from the opt/ folder of your Flink distribution, to the lib/ folder. set the property queryable-state.

What is state in Apache Flink?

The state is an important concept in Apache Flink. Flink supports both stateful and stateless computation. Two basic types of states in Flink are Keyed State and Operator State.

What is KeyBy in Flink?

According to the Apache Flink documentation, KeyBy transformation logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition.

How does Flink work in SQL?

Flink SQL is a unified API for batch and stream processing: this allows using the same queries for processing historic data and real-time data. Support for both processing time and event time semantics. Support for working with nested Avro and JSON data. User-defined scalar, aggregation, and table-valued functions.


1 Answers

You can perform database look up by extending a rich function for e.g. a RichFlatMap function, initialize the database connection once in its open() method and then process each event in the flatMap() method:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {

    // Declare DB coonection and query statements

    @Override
    public void open(Configuration parameters) throws Exception {
      // Initialize Database connection
      // Prepare Query statements
    }

    @Override
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
      // look up the Database, update record, enrich event
      out.collect(enrichedEvent);        
    }
})

And then you can use DatabaseMapper as follows:

stream.keyby(id)
      .flatmap(new DatabaseMapper())
      .addSink(..);

You can find here an example using cached data from Redis.

like image 78
Yassine Marzougui Avatar answered Oct 12 '22 02:10

Yassine Marzougui