Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Introduce time delay before moving flow files to next processor in NiFi

Tags:

apache-nifi

In NiFi, there exist a data flow to consume from MQTT (ConsumeMQTT) and publish into HDFS path (PutHDFS). I got a requirement to introduce 60 min delay before pushing the consumed data into HDFS path. Found ControlRate and MergeContent processor to be possible solution but not sure.

What is the ideal solution to introduce time delay?

Example: A flow file consumed at 9:00 AM should be published into HDFS at 10:00 AM

enter image description here

like image 835
Vasanth Subramanian Avatar asked May 18 '20 17:05

Vasanth Subramanian


1 Answers

You can use an ExecuteScript processor to run a sleep(60*60*1000) loop, but this would unnecessarily use system resources.

I would instead introduce a RouteOnAttribute processor which has an output relationship of one_hour_elapsed going to PutHDFS, and unmatched looped back to itself. The RouteOnAttribute processor should have Routing Strategy set to Route to Property Name and a dynamic property (click the + button on the top right of the Properties tab) named one_hour_elapsed. The Expression Language value should be ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})}.

This expression:

  1. Gets the current time and converts it to milliseconds since the epoch (now():toNumber())
  2. Gets the entryDate attribute of the flowfile (when it entered NiFi) and converts it to milliseconds and adds one hour (entryDate:toNumber():plus(3600000) [3600000 == 60*60*1000])
  3. Compares the two numbers (a:gt(${b}))

If this is not actually the start of your flow, you can use an UpdateAttribute processor to insert an arbitrary timestamp at any point of your flow and calculate from there.

I would also recommend setting the Yield Duration and Run Schedule of the RouteOnAttribute processor to be substantially higher than usual, as you do not want this processor to run constantly as it will do no work. I'd suggest setting this to 1 or 5 minutes to start, as you are introducing a one hour delay already.

Flow showing timing loop

like image 65
Andy Avatar answered Sep 25 '22 22:09

Andy