Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using NiFi Flowfiles as an Event Notifier

Tags:

apache-nifi

New to NiFi!

I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.

In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this? If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear

Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...

Any help would be appreciated, Thanks in advance!

like image 928
BigBug Avatar asked Dec 07 '22 18:12

BigBug


1 Answers

I'm going to break this answer into a few parts, as there is a lot going on here.

I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.

The GenerateFlowFile processor allows you to send an empty (or populated) flowfile at a regular run schedule or using CRON scheduling. You can combine this with the UpdateAttribute processor to add arbitrary static or dynamic attributes to the flowfile.

In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this?

This is coming close to batch processing, which Apache NiFi is not designed nor optimized for. To determine a source processor is "about to be triggered" is very difficult. If that processor is triggered on a timer/CRON basis, you can be aware of that timing, but if you mean something like "GetFile is about to successfully retrieve a file," that's not easily doable. It's possible to extend the processor with your own custom processor and override the onTrigger() method to store some value in a DistributedMapCacheClientService that another processor can pick up on. Or I guess you could wrap the logic in an ExecuteScript processor and write custom notification code. I'm not sure of the target here -- who gets notified of this state change? Is it another processor, a human observer, or an external service?

If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear

Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...

However, I believe what you are asking for is possible with the use of the new Wait and Notify processors. Koji Kawamura has written a good article describing their use here.

I think in this case you need special content or attributes to be able to detect the batches coming through the system unless it is a single unit of data at a time. I'll try to describe two scenarios below, but I don't have much context on this.

Scenario 1 (Single unit of data)

Feel free to substitute a different source processor, but I'm using GetFile for simplicity's sake.

Let's say you have a directory full of text files (placed there by some external process). Each file contains text in the form "Firstname Lastname" and is named Lastname_YYYY-MM-DD-HH-mm-ss.txt with the timestamp it is written populating the filename.

GetFile -> ReplaceText -> PutFile

The GetFile processor will bring in each file as a separate flowfile. From there, ReplaceText can do something simple like use a regex to switch the order of the names, and PutFile writes the contents back out to the file system. When GetFile is triggered the first time, it will dispatch n flowfiles to the connection/queue to ReplaceText. If you want it to wait and perform the actions linearly instead of in parallel, you can set the back pressure on the success queue to 1 flowfile to prevent the preceding processor (GetFile) from running until the queue is empty again.

Scenario 2 (multiple flowfiles must be grouped together and operated on in conjunction)

Here you'll want to use MergeContent to collect multiple flowfiles into a single one. You can set the bin threshold to n flowfiles and the MergeContent processor will only transmit a success flowfile when it has reached the minimum number of incoming flowfiles. You can also bin by attributes, so if you're reading from a heterogenous input source, you can still correlate associated pieces of data based on a common feature.

Alternate Scenario with Wait & Notify

In addition, you can use the Notify processor to send a trigger flowfile to the corresponding Wait processor to "release" the "content" flowfiles to their desired destination. Again, Koji's article linked above explains this in detail with an example flow and a number of screenshots.

I hope this at least gives you a direction to follow. Without more context, I still get the sense that you're trying to solve a non-NiFi problem here, or maybe could adapt your dataflow model to better support a streaming mentality. If you have more information, I'm happy to expand the answer.

like image 191
Andy Avatar answered Mar 16 '23 20:03

Andy