Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parsing date time information from CSV in Zeppelin and Spark

I'm trying to read CSV file and build up data frame.

The format of CSV like blow. I used ISO8602 date/time format for data/time string representation.

2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1
......

To load this data, I wrote the scala code in Zeppelin like below

import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import sys.process._

val logCSV = sc.textFile ("log_table.csv")

case class record(
    callingTime:DateTime, 
    userID:String, 
    CID:String, 
    serverConnectionTime:DateTime, 
    serverResponseTime:DateTime, 
    connectedAgentID:String, 
    beginCallingTime:DateTime, 
    endCallingTime:DateTime, 
    Succeed:Int)


val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss")

val logTable = logCSV.map(s => s.split(",") ).map(
    s => record(
            formatter.parseDateTime( s(0) ), 
            s(1),
            s(2),
            formatter.parseDateTime( s(3) ), 
            formatter.parseDateTime( s(4) ), 
            s(5),
            formatter.parseDateTime( s(6) ), 
            formatter.parseDateTime( s(7) ),            
            s(8).toInt
        )
).toDF()

The it made error like below. Main issue is DateTime is not serializable.

logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169
defined class record
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)

Then I wonder how I handle date/time information in Scala. Could you help me?

like image 647
Jinho Yoo Avatar asked Sep 28 '22 11:09

Jinho Yoo


1 Answers

While a DateTime isn't serialiable, if you use the parseMillis method of DateTimeFormatter, you'll get a long, which is toll-free bridged to Long, which is Serializable. To get the DateTime back from the Long, use the DateTime(longInstance.longValue()) constructor.

like image 130
hd1 Avatar answered Oct 03 '22 06:10

hd1