Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading CSV into a Spark Dataframe with timestamp and date types

It's CDH with Spark 1.6.

I am trying to import this Hypothetical CSV into a apache Spark DataFrame:

$ hadoop fs -cat test.csv
a,b,c,2016-09-09,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10,a,2016-11-11 09:09:10.0,a

I use databricks-csv jar.

val textData = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

I use inferSchema to make the schema for the resulting DataFrame. printSchema() function gives me the following output for the code above:

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: string (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

The C3 column has String type. I want C3 to have date type. To get it to date type I tried the following code.

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd")
    .option("inferSchema", "true")
    .option("nullValue", "null")
    .load("test.csv")

scala> textData.printSchema
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: timestamp (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)

scala> textData.show()
+---+---+---+--------------------+---+--------------------+---+
| C0| C1| C2|                  C3| C4|                  C5| C6|
+---+---+---+--------------------+---+--------------------+---+
|  a|  b|  c|2016-09-09 00:00:...|  a|2016-11-11 00:00:...|  a|
|  a|  b|  c|2016-09-10 00:00:...|  a|2016-11-11 00:00:...|  a|
+---+---+---+--------------------+---+--------------------+---+

The only difference between this code and the first block is the dateFormat option line (I use "yyyy-MM-dd" instead of "yyyy-MM-dd HH:mm:ss").Now I get both C3 and C5 as timestamps(C3 is still not date). But for C5, the HH::mm:ss part is ignored and shows up as zeroes in the data.

Ideally I want C3 to be of type date, C5 to be of type timestamp and its HH:mm:ss part to be not ignored. My solution right now looks like this. I make the csv by pulling data in parallel from my DB. I make sure that I pull all dates as timestamps (Not ideal). So, the test csv looks like this now:

$ hadoop fs -cat new-test.csv
a,b,c,2016-09-09 00:00:00,a,2016-11-11 09:09:09.0,a
a,b,c,2016-09-10 00:00:00,a,2016-11-11 09:09:10.0,a

This is my final working code:

val textData = sqlContext.read.format("com.databricks.spark.csv")
    .option("header", "false")
    .option("delimiter", ",")
    .option("dateFormat", "yyyy-MM-dd HH:mm:ss")
    .schema(finalSchema)
    .option("nullValue", "null")
    .load("new-test.csv")

Here, I use the complete timestamp format ("yyyy-MM-dd HH:mm:ss") in dateFormat. I manually create the finalSchema instance where c3 is date and C5 is Timestamp type(Spark sql types). I apply these schema use the schema() function. The output looks like follows:

scala> finalSchema
res4: org.apache.spark.sql.types.StructType = StructType(StructField(C0,StringType,true), StructField(C1,StringType,true), StructField(C2,StringType,true), StructField(C3,DateType,true), StructField(C4,StringType,true), StructField(C5,TimestampType,true), StructField(C6,StringType,true))

scala> textData.printSchema()
root
 |-- C0: string (nullable = true)
 |-- C1: string (nullable = true)
 |-- C2: string (nullable = true)
 |-- C3: date (nullable = true)
 |-- C4: string (nullable = true)
 |-- C5: timestamp (nullable = true)
 |-- C6: string (nullable = true)


scala> textData.show()
+---+---+---+----------+---+--------------------+---+
| C0| C1| C2|        C3| C4|                  C5| C6|
+---+---+---+----------+---+--------------------+---+
|  a|  b|  c|2016-09-09|  a|2016-11-11 09:09:...|  a|
|  a|  b|  c|2016-09-10|  a|2016-11-11 09:09:...|  a|
+---+---+---+----------+---+--------------------+---+

Is there an easier or out of the box way to parse out a csv file (that has both date and timestamp type into a spark dataframe?

Relevant Links:
http://spark.apache.org/docs/latest/sql-programming-guide.html#manually-specifying-options
https://github.com/databricks/spark-csv

like image 422
Mihir Shinde Avatar asked Nov 30 '16 00:11

Mihir Shinde


People also ask

How do I cast a timestamp to a Spark?

Solution: Using <em>date_format</em>() Spark SQL date function, we can convert Timestamp to the String format.


2 Answers

With a infer option for non-trivial cases it will probably not return the expected result. As you can see in InferSchema.scala:

if (field == null || field.isEmpty || field == nullValue) {   typeSoFar } else {   typeSoFar match {     case NullType => tryParseInteger(field)     case IntegerType => tryParseInteger(field)     case LongType => tryParseLong(field)     case DoubleType => tryParseDouble(field)     case TimestampType => tryParseTimestamp(field)     case BooleanType => tryParseBoolean(field)     case StringType => StringType     case other: DataType =>       throw new UnsupportedOperationException(s"Unexpected data type $other") 

It will only try to match each column with a timestamp type, not a date type, so the "out of the box solution" for this case is not possible. But with my experience the "easier" solution, is directly define the schema with the needed type, it will avoid the infer option set a type that only matches for the RDD evaluated not the entire data. Your final schema is an efficient solution.

like image 73
Jader Martins Avatar answered Oct 18 '22 15:10

Jader Martins


It's not really elegant but you can convert from timestamp to date like this (check last line):

val textData = sqlContext.read.format("com.databricks.spark.csv")     .option("header", "false")     .option("delimiter", ",")     .option("dateFormat", "yyyy-MM-dd")     .option("inferSchema", "true")     .option("nullValue", "null")     .load("test.csv")     .withColumn("C4", expr("""to_date(C4)""")) 
like image 29
Carlos Verdes Avatar answered Oct 18 '22 14:10

Carlos Verdes