I have implemented a SourceFunction that fetches data (a String) from an URL. Then I am doing keyBy() of that data and apply a window of 10 minutes. Now the SourceFunction is called only once and the windows operate on the data for 10 minutes. How can I get data continously from the SourceFunction?
DataStream<String> = env.addSource(MySource()) // This runs only once
.keyBy(some keyby function)
.window(for 10 minutes) // This runs for 10 minutes for the data obtained once by Source function
.process(some process function)
I want to run the SourceFunction repeatedly in a certain time interval and let window work on the continously fetched data.
Your SourceFunctions run() method should be a loop which does a sleep (or whatever other scheduling mechanism) to do the work.
A common pattern is to use some sort of atomic boolean that you set to true when run is first called, and gets set to false when cancel is called.
So you have something like this in your run method:
while (running) {
// fetch some data, can be async
ctx.collect(data);
Thread.sleep(period);
}
You can do that part however you see fit but the important thing is that you do not exit the run method of your SourceFunction until you are actually done or you have been cancelled.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With