I'm looking into ETL tools (like Talend) and investigating whether Apache Nifi could be used. Could Nifi be used to perform the following:
I've tried setting up a job in Nifi, but couldn't see how to perform the join of two separate CSV files. Is this task possible in Apache Nifi?
It looks like the QueryDNS processor could be used to perform enrichment of one CSV file using the other, but that seems to be over-complicated for this use case.
Here's an example of the input CSVs, which need to be joined on state_id:
customers.csv
id | name | address | state_id
---|------|--------------|---------
1 | John | 10 Blue Lane | 100
2 | Bob | 15 Green St. | 200
states.csv
state_id | state
---------|---------
100 | Alabama
200 | New York
output.csv
id | name | address | state
---|------|--------------|---------
1 | John | 10 Blue Lane | Alabama
2 | Bob | 15 Green St. | New York
When data is transferred to a clustered instance of NiFi via an RPG, the RPG will first connect to the remote instance whose URL is configured to determine which nodes are in the cluster and how busy each node is. This information is then used to load balance the data that is pushed to each node.
NiFi provides several built-in processors for data processing, conversion, transformation, enrichment, etc. Some of these processors that can be particularly used for data model transformation are introduced below.
Apache NiFi is more of a dataflow tool and not really made to perform arbitrary joins of streaming data. Typically those types of operations are better suited to stream processing systems like Storm, Flink, Apex, etc, or ETL tools.
The types of joins that NiFi can do well are enrichment look ups where there is a fixed size lookup dataset, and for each record in the incoming data you use the lookup dataset to retrieve some value. For example, in your case there could be a processor called LookUpState which has a property "State Data" which points to a file containing all the states, then the customers.csv could be the input to this processor.
A community member started a project to make a generic lookup service for NiFi: https://github.com/jfrazee/nifi-lookup-service
The typical pattern one follows for this is to load the reference set into a map cache controller service in NiFi. In this case that is the states.csv
data. Then the live feed of customer data comes in and is enriched with this reference data using something like ReplaceText
or you could even write a custom processor in Groovy. There are a lot of ways to slice this. There is also a JIRA/PR coming for making this even easier. There are elements of live stream joins that are best done in processing systems like Apache Storm, Spark, and Flink, but for the case you mention it can be done well in NiFi.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With