New to NiFi!
I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.
In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this? If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear
Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...
Any help would be appreciated, Thanks in advance!
I'm going to break this answer into a few parts, as there is a lot going on here.
I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.
The GenerateFlowFile
processor allows you to send an empty (or populated) flowfile at a regular run schedule or using CRON scheduling. You can combine this with the UpdateAttribute
processor to add arbitrary static or dynamic attributes to the flowfile.
In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this?
This is coming close to batch processing, which Apache NiFi is not designed nor optimized for. To determine a source processor is "about to be triggered" is very difficult. If that processor is triggered on a timer/CRON basis, you can be aware of that timing, but if you mean something like "GetFile
is about to successfully retrieve a file," that's not easily doable. It's possible to extend the processor with your own custom processor and override the onTrigger()
method to store some value in a DistributedMapCacheClientService
that another processor can pick up on. Or I guess you could wrap the logic in an ExecuteScript
processor and write custom notification code. I'm not sure of the target here -- who gets notified of this state change? Is it another processor, a human observer, or an external service?
If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2 Processor3 End ...
However, I believe what you are asking for is possible with the use of the new Wait
and Notify
processors. Koji Kawamura has written a good article describing their use here.
I think in this case you need special content or attributes to be able to detect the batches coming through the system unless it is a single unit of data at a time. I'll try to describe two scenarios below, but I don't have much context on this.
Feel free to substitute a different source processor, but I'm using GetFile
for simplicity's sake.
Let's say you have a directory full of text files (placed there by some external process). Each file contains text in the form "Firstname Lastname" and is named Lastname_YYYY-MM-DD-HH-mm-ss.txt
with the timestamp it is written populating the filename.
GetFile -> ReplaceText -> PutFile
The GetFile
processor will bring in each file as a separate flowfile. From there, ReplaceText
can do something simple like use a regex to switch the order of the names, and PutFile
writes the contents back out to the file system. When GetFile
is triggered the first time, it will dispatch n flowfiles to the connection/queue to ReplaceText
. If you want it to wait and perform the actions linearly instead of in parallel, you can set the back pressure on the success queue to 1
flowfile to prevent the preceding processor (GetFile
) from running until the queue is empty again.
Here you'll want to use MergeContent
to collect multiple flowfiles into a single one. You can set the bin threshold to n flowfiles and the MergeContent
processor will only transmit a success flowfile when it has reached the minimum number of incoming flowfiles. You can also bin by attributes, so if you're reading from a heterogenous input source, you can still correlate associated pieces of data based on a common feature.
Wait
& Notify
In addition, you can use the Notify
processor to send a trigger flowfile to the corresponding Wait
processor to "release" the "content" flowfiles to their desired destination. Again, Koji's article linked above explains this in detail with an example flow and a number of screenshots.
I hope this at least gives you a direction to follow. Without more context, I still get the sense that you're trying to solve a non-NiFi problem here, or maybe could adapt your dataflow model to better support a streaming mentality. If you have more information, I'm happy to expand the answer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With