Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NiFi: Remove fixed number of header lines from file

I'm processing a file and I'd like to remove (trim) the first X header lines to keep only data, possibly avoiding using regular expressions.

Thanks

like image 515
Filippo Loddo Avatar asked Jan 13 '17 18:01

Filippo Loddo


People also ask

What is GenerateFlowFile in NiFi?

Description: This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful for load testing, configuration, and simulation. Also see DuplicateFlowFile for additional load testing.

What is lineage duration in NiFi?

We can also see in this dialog the "Lineage Duration" was "00:00:02.712" or 2.712 seconds. The "Lineage Duration" field tells us how long elapsed between the time when the original source data was received and the time at which this event occurred.

How do I clear the queue in NiFi?

A user can check the flowfile by selecting the List queue option in the drop down list. In case of any overload or error, a user can also clear the queue by selecting the empty queue option and then the user can restart the flow to get those files again in the data flow.

What is penalty duration in NiFi?

in nifi processor settings, there is a property. Penalty duration:The amount of time used when this processor penalizes a FlowFile.


1 Answers

You can remove the first X header lines by using ExecuteScript procesor in Nifi.

The following is a example Jython script which I wrote for myself:

import json
import java.io
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback

class PyStreamCallback(StreamCallback):
  def __init__(self):
        pass
  def process(self, inputStream, outputStream):
    text = IOUtils.readLines(inputStream, StandardCharsets.UTF_8)
    for line in text[3:]:
        outputStream.write(line + "\n") 

flowFile = session.get()
if (flowFile != None):
  flowFile = session.write(flowFile,PyStreamCallback())
  flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json')
  session.transfer(flowFile, REL_SUCCESS)

This obviously removes the first 3 lines but you can easily modify it to remove more or less lines.

Hope that helps.

like image 121
Biplob Biswas Avatar answered Oct 31 '22 05:10

Biplob Biswas