Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Remove duplicates in NiFi

I have a flow QueryDatabaseTable->ConvertRecord->PutElasticseachHttpRecord What I am trying to do is fetching the full data from MySQL database and feeding it into Elasticsearch to perform analytics on it using Kibana. However, my data has duplicate columns like below:(Highlighted in black are the only repeating value)

ID,Machine Name,value1,Value2,Date

1, abc, 10, 34, 2018-09-27 10:40:10

2, abc, 10, 34, 2018-09-27 10:41:14

3,abc, 10, 34, 2018-09-27 10:42:19

4, xyz, 12, 45, 2018-09-27 10:45:19

So In my table ID is primary key and Timestamp fields keep updating. What I want to achieve is to fetch only one record for particular Machine Name. Example below show what output table I want:

ID,Machine Name,value1,Value2,Date

1, abc, 10, 34, 2018-09-27 10:40:10

4, xyz, 12, 45, 2018-09-27 10:45:19

How can I achieve this in NiFi? The objective is to drop/delete the duplicate columns. If it's possible please tell me which processor to use and what configs to set?

I am getting following error in QueryRecord processor: QueryRecord Error

QueryRecord Error

QueryRecord Error2

Any suggestion is much appreciated. Thank You

like image 476
Shrads Avatar asked Oct 17 '22 12:10

Shrads


1 Answers

Instead of using convertrecord processor, use QueryRecord processor.

Add new SQL query using row_number() window function and Partition by Machine Name,value1,value2...etc and select only the first row in each partition data.

QueryRecord Configs:

I tried with Csv reader and Json set Writer controller services

Query:

select id,machinename,value1,value2,"date" from(
SELECT id,
         machinename,value1,value2,"date",
         row_number() over (partition by machinename order by "date" asc) as rn
  from FLOWFILE
) sq
WHERE rn = 1

enter image description here

Output:

[ {
  "id" : "1",
  "machinename" : "abc",
  "value1" : "10",
  "value2" : "34",
  "date" : "2018-09-27 10:40:10"
}, {
  "id" : "4",
  "machinename" : "xyz",
  "value1" : "12",
  "value2" : "45",
  "date" : "2018-09-27 10:45:19"
} ]

Flow:

QueryDatabaseTable->QueryRecord->PutElasticseachHttpRecord

Incase if you are having space in machine name column name then enclose column name in double quotes("<col_name>") use below query

select id,"machine name",value1,value2,"date" from(
SELECT id,
         "machine name",value1,value2,"date",
         row_number() over (partition by "machine name" order by "date" asc) as rn
  from FLOWFILE
) sq
WHERE rn = 1

i have included only machine name column in partition by clause but you can include other columns value1,value2 as per your requirements.

Use this template for more reference.

like image 187
notNull Avatar answered Oct 21 '22 05:10

notNull