I've 3 keyed data streams of different types.
DataStream<A> first;
DataStream<B> second;
DataStream<C> third;
Each stream has its own processing logic defined and share a state between them. I want to connect these 3 streams triggering the respective processing functions whenever data is available in any stream. Connect on two streams is possible.
first.connect(second).process(<CoProcessFunction>)
I can't use union (allows multiple data stream) as the types are different. I want to avoid creating a wrapper and convert all the streams into the same type.
Join in Action Now run the flink application and also tail the log to see the output. Enter messages in both of these two netcat windows within a window of 30 seconds to join both the streams. The resultant data stream has complete information of an individual-: the id, name, department, and salary.
Flink is a framework and distributed processing engine for batch and stream data processing. Its structure enables it to process a finite amount of data and infinite streams of data. Flink provides data source/sink connectors to read data from and write data to external systems like Kafka, HDFS, Cassandra etc.
Flink capabilities enable real-time insights from streaming data and event-based capabilities. Flink enables real-time data analytics on streaming data and fits well for continuous Extract-transform-load (ETL) pipelines on streaming data and for event-driven applications as well.
Using keyed streams - Flink TutorialFlink distributes the events in a data stream to different task slots based on the key. Flink users are hashing algorithms to divide the stream by partitions based on the number of slots allocated to the job. It then distributes the same keys to the same slots.
The wrapper approach isn't too bad, really. You can create an EitherOfThree<T1, T2, T3>
wrapper class that's similar to Flink's existing Either<Left, Right>
, and then process a stream of those records in a single function. Something like:
DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
.union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
.union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
combo.process(new MyProcessFunction());
Flink's Either
class has a more elegant implementation, but for your use case something simple should work.
Other than union, the standard approach is to use connect in a cascade, e.g.,
first.connect(second).process(...).connect(third).process(...)
You won't be able to share state between all three streams in one place. You can have the first process function output whatever the subsequent process function will need, but the third stream won't be able to affect the state in the first process function, which is a problem for some use cases.
Another possibility might be to leverage a lower-level mechanism -- see FLIP-92: Add N-Ary Stream Operator in Flink. However, this mechanism is intended for internal use (the Table/SQL API uses this for n-way joins), and would need to be treated with caution. See the mailing list discussion for details. I mention this for completeness, but I'm skeptical this is a good idea until the interface is further developed.
You might also want to look at the stateful functions api, which overcomes many of the restrictions of the datastream api.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With