Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache NiFi: Output to multiple FlowFiles simultaneously?

Is there a way to simultaneously write to different streams in a custom processor in NiFi? For instance I have third party libraries that do significant processing using APIs that work something like this:

public void process(InputStream in, OutputStream foo, OutputStream baa, List<String> args)
{
    ...
    foo.write(things);
    baa.write(stuff);
    ...
}

But the only examples I can find all just use one output stream:

FlowFile transform = session.write(original, new OutputStreamCallback() {
        @Override
        public void process(OutputStream out) throws IOException {
            out.write("stuff");
        }
    });

Processing is done in batches, (due to its large scale), so its not practical to perform all the processing then write out the separate flows.

The only way I can come up with is process the input multiple times :(

To clarify, I want to write to multiple FlowFiles, using the session.write(flowfile, callback) method, so the different streams can be sent/managed separately

like image 239
foobarking Avatar asked Mar 11 '23 17:03

foobarking


2 Answers

The NiFi API is based on acting upon one flow file at a time, but you should be able to do something like this:

        FlowFile flowFile1 = session.create();
        final AtomicReference<FlowFile> holder = new AtomicReference<>(session.create());

        flowFile1 = session.write(flowFile1, new OutputStreamCallback() {
            @Override
            public void process(OutputStream out) throws IOException {

                FlowFile flowFile2 = session.write(holder.get(), new OutputStreamCallback() {
                    @Override
                    public void process(OutputStream out) throws IOException {

                    }
                });
                holder.set(flowFile2);

            }
        });
like image 183
Bryan Bende Avatar answered Mar 20 '23 15:03

Bryan Bende


Since you're making different outputs from the same input you might also consider having these steps be broken out as discrete processors that focus on doing their specific function. Above you show "things" and "stuff" so for example I'm suggesting you have a 'DoThings' and 'DoStuff' processor. In your flow you can send the same flowfile to both by simply using the source connection twice. This then enables nice parallel operations and allows them to have different runtimes/etc. NiFi will still maintain the provenance trail for you and it won't actually be copying the bytes at all but rather passing a pointer to the original content.

like image 26
Joe Witt Avatar answered Mar 20 '23 15:03

Joe Witt