Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UDF to extract only the file name from path in Spark SQL

There is input_file_name function in Apache Spark which is used by me to add new column to Dataset with the name of file which is currently being processed.

The problem is that I'd like to somehow customize this function to return only file name, ommitting the full path to it on s3.

For now, I am doing replacement of the path on the second step using map function:

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
  val baseName: String = FilenameUtils.getBaseName(fileName)
  val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
  this.valueOf(tmpFileName)
}

But I'd like to use something like

val initialDs = spark.sqlContext.read
    .option("dateFormat", conf.dateFormat)
    .schema(conf.schema)
    .csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
like image 814
max.kuzmentsov Avatar asked Nov 28 '16 16:11

max.kuzmentsov


2 Answers

In Scala:

#register udf
spark.udf
  .register("get_only_file_name", (fullPath: String) => fullPath.split("/").last)

#use the udf to get last token(filename) in full path
val initialDs = spark.read
  .option("dateFormat", conf.dateFormat)
  .schema(conf.schema)
  .csv(conf.path)
  .withColumn("input_file_name", get_only_file_name(input_file_name))

Edit: In Java as per comment

#register udf
spark.udf()
  .register("get_only_file_name", (String fullPath) -> {
     int lastIndex = fullPath.lastIndexOf("/");
     return fullPath.substring(lastIndex, fullPath.length - 1);
    }, DataTypes.StringType);

import org.apache.spark.sql.functions.input_file_name    

#use the udf to get last token(filename) in full path
Dataset<Row> initialDs = spark.read()
  .option("dateFormat", conf.dateFormat)
  .schema(conf.schema)
  .csv(conf.path)
  .withColumn("input_file_name", get_only_file_name(input_file_name()));
like image 51
mrsrinivas Avatar answered Nov 03 '22 01:11

mrsrinivas


Borrowing from a related question here, the following method is more portable and does not require a custom UDF.

Spark SQL Code Snippet: reverse(split(path, '/'))[0]

Spark SQL Sample:

WITH sample_data as (
SELECT 'path/to/my/filename.txt' AS full_path
)
SELECT
      full_path
    , reverse(split(full_path, '/'))[0] as basename
FROM sample_data

Explanation: The split() function breaks the path into it's chunks and reverse() puts the final item (the file name) in front of the array so that [0] can extract just the filename.

Full Code example here :

  spark.sql(
    """
      |WITH sample_data as (
      |    SELECT 'path/to/my/filename.txt' AS full_path
      |  )
      |  SELECT
      |  full_path
      |  , reverse(split(full_path, '/'))[0] as basename
      |  FROM sample_data
      |""".stripMargin).show(false)

Result :

+-----------------------+------------+
|full_path              |basename    |
+-----------------------+------------+
|path/to/my/filename.txt|filename.txt|
+-----------------------+------------+
like image 21
aaronsteers Avatar answered Nov 03 '22 00:11

aaronsteers