Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get file name of DataStream with Flink

I have a streaming process with flink working with csv files in a single path. I want to know the file name of each processed file.

I am currently using this function to read csv files into the path(dataPath).

val recs:DataStream[CallCenterEvent] = env
          .readFile[CallCenterEvent](
          CsvReader.getReaderFormat[CallCenterEvent](dataPath, c._2),
          dataPath,
          FileProcessingMode.PROCESS_CONTINUOUSLY,
          c._2.fileInterval)
          .uid("source-%s-%s".format(systemConfig.name, c._1))
          .name("%s records reading".format(c._1))

And using this function to obtain the TupleCsvInputFormat.

def getReaderFormat[T <: Product : ClassTag : TypeInformation](dataPath:String, conf:URMConfiguration): TupleCsvInputFormat[T] = {
  val typeInfo = implicitly[TypeInformation[T]]
  val format: TupleCsvInputFormat[T] = new TupleCsvInputFormat[T](new Path(dataPath), typeInfo.asInstanceOf[CaseClassTypeInfo[T]])
  if (conf.quoteCharacter != null && !conf.quoteCharacter.equals(""))
    format.enableQuotedStringParsing(conf.quoteCharacter.charAt(0))
  format.setFieldDelimiter(conf.fieldDelimiter)
  format.setSkipFirstLineAsHeader(conf.ignoreFirstLine)
  format.setLenient(true)

  return format
}       

The process run ok, but I can't find a way to get the file name of each csv file processed.

Thanks in advance

like image 895
Roizo Avatar asked Oct 10 '17 16:10

Roizo


1 Answers

I faced similar situation where the I need to know the filename of the record being processed. There is some information in the filename that is not available inside record. Asking customer to change the record schema is not an option.

I found a way to get access to the underlying source. In my case it's FileInputSplit (This has Path info of the source data file)

class MyTextInputFormat(p:Path ) extends TextInputFormat(p) {

     override def readRecord(reusable: String, bytes: Array[Byte], offset: Int, numBytes: Int):String = {
val fileName = {
      if (this.currentSplit != null)      
        this.currentSplit.getPath.getName
      else
         "unknown-file-path"
    }

    //Add FileName to the record!
    super.readRecord(reusable, bytes, offset, numBytes)+","+fileName
  }
}

Now, you can use this in stream setup

val format = new MyTextInputFormat(new Path(srcDir))
format.setDelimiter(prfl.lineSep)
val stream = env.readFile(format, srcDir, FileProcessingMode.PROCESS_CONTINUOUSLY, Time.seconds(10).toMilliseconds

While my situation is little different, this approach should help you as well!

like image 122
Bon Speedy Avatar answered Oct 30 '22 04:10

Bon Speedy