I have a flow QueryDatabaseTable->ConvertRecord->PutElasticseachHttpRecord What I am trying to do is fetching the full data from MySQL database and feeding it into Elasticsearch to perform analytics on it using Kibana. However, my data has duplicate columns like below:(Highlighted in black are the only repeating value)
ID,Machine Name,value1,Value2,Date
1, abc, 10, 34, 2018-09-27 10:40:10
2, abc, 10, 34, 2018-09-27 10:41:14
3,abc, 10, 34, 2018-09-27 10:42:19
4, xyz, 12, 45, 2018-09-27 10:45:19
So In my table ID is primary key and Timestamp fields keep updating. What I want to achieve is to fetch only one record for particular Machine Name. Example below show what output table I want:
ID,Machine Name,value1,Value2,Date
1, abc, 10, 34, 2018-09-27 10:40:10
4, xyz, 12, 45, 2018-09-27 10:45:19
How can I achieve this in NiFi? The objective is to drop/delete the duplicate columns. If it's possible please tell me which processor to use and what configs to set?
I am getting following error in QueryRecord processor:
Any suggestion is much appreciated. Thank You
Instead of using convertrecord
processor, use QueryRecord processor.
Add new SQL query using row_number() window function and Partition by Machine Name,value1,value2...etc
and select only the first row in each partition data.
QueryRecord Configs:
I tried with Csv reader
and Json set Writer
controller services
Query:
select id,machinename,value1,value2,"date" from(
SELECT id,
machinename,value1,value2,"date",
row_number() over (partition by machinename order by "date" asc) as rn
from FLOWFILE
) sq
WHERE rn = 1
Output:
[ {
"id" : "1",
"machinename" : "abc",
"value1" : "10",
"value2" : "34",
"date" : "2018-09-27 10:40:10"
}, {
"id" : "4",
"machinename" : "xyz",
"value1" : "12",
"value2" : "45",
"date" : "2018-09-27 10:45:19"
} ]
Flow:
QueryDatabaseTable->QueryRecord->PutElasticseachHttpRecord
Incase if you are having space in machine name column name
then enclose column name in double quotes("<col_name>")
use below query
select id,"machine name",value1,value2,"date" from(
SELECT id,
"machine name",value1,value2,"date",
row_number() over (partition by "machine name" order by "date" asc) as rn
from FLOWFILE
) sq
WHERE rn = 1
i have included only machine name
column in partition by clause but you can include other columns value1,value2
as per your requirements.
Use this template for more reference.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With