In NiFi, there exist a data flow to consume from MQTT (ConsumeMQTT) and publish into HDFS path (PutHDFS). I got a requirement to introduce 60 min delay before pushing the consumed data into HDFS path. Found ControlRate and MergeContent processor to be possible solution but not sure.
What is the ideal solution to introduce time delay?
Example: A flow file consumed at 9:00 AM should be published into HDFS at 10:00 AM
You can use an ExecuteScript
processor to run a sleep(60*60*1000)
loop, but this would unnecessarily use system resources.
I would instead introduce a RouteOnAttribute
processor which has an output relationship of one_hour_elapsed
going to PutHDFS
, and unmatched
looped back to itself. The RouteOnAttribute
processor should have Routing Strategy set to Route to Property Name and a dynamic property (click the + button on the top right of the Properties tab) named one_hour_elapsed. The Expression Language value should be ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}
.
This expression:
now():toNumber()
)entryDate
attribute of the flowfile (when it entered NiFi) and converts it to milliseconds and adds one hour (entryDate:toNumber():plus(3600000)
[3600000 == 60*60*1000
])a:gt(${b})
)If this is not actually the start of your flow, you can use an UpdateAttribute
processor to insert an arbitrary timestamp at any point of your flow and calculate from there.
I would also recommend setting the Yield Duration and Run Schedule of the RouteOnAttribute
processor to be substantially higher than usual, as you do not want this processor to run constantly as it will do no work. I'd suggest setting this to 1 or 5 minutes to start, as you are introducing a one hour delay already.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With