I have a streaming process with flink working with csv files in a single path. I want to know the file name of each processed file.
I am currently using this function to read csv files into the path(dataPath).
val recs:DataStream[CallCenterEvent] = env
.readFile[CallCenterEvent](
CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
dataPath,
FileProcessingMode.PROCESS_CONTINUOUSLY,
c._2.fileInterval)
.uid("source-%s-%s".format(systemConfig.name, c._1))
.name("%s records reading".format(c._1))
And using this function to obtain the TupleCsvInputFormat.
def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
val typeInfo = implicitly[TypeInformation[T]]
val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
format.setFieldDelimiter(conf.fieldDelimiter)
format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
format.setLenient(true)
return format
}
The process run ok, but I can't find a way to get the file name of each csv file processed.
Thanks in advance
I faced similar situation where the I need to know the filename of the record being processed. There is some information in the filename that is not available inside record. Asking customer to change the record schema is not an option.
I found a way to get access to the underlying source. In my case it's FileInputSplit (This has Path info of the source data file)
class MyTextInputFormat(p:Path ) extends TextInputFormat(p) {
override def readRecord(reusable: String, bytes: Array[Byte], offset: Int, numBytes: Int):String = {
val fileName = {
if (this.currentSplit != null)
this.currentSplit.getPath.getName
else
"unknown-file-path"
}
//Add FileName to the record!
super.readRecord(reusable, bytes, offset, numBytes)+","+fileName
}
}
Now, you can use this in stream setup
val format = new MyTextInputFormat(new Path(srcDir))
format.setDelimiter(prfl.lineSep)
val stream = env.readFile(format, srcDir, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.seconds(10).toMilliseconds
While my situation is little different, this approach should help you as well!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With