I'm trying to read CSV file and build up data frame.
The format of CSV like blow. I used ISO8602 date/time format for data/time string representation.
2015-6-29T12:0:0,b82debd63cffb1490f8c9c647ca97845,G1J8RX22EGKP,2015-6-29T12:0:5,2015-6-29T12:0:6,0QA97RAM1GIV,2015-6-29T12:0:10,2015-6-29T12:0:11,2015-6-29T12:0:12,2015-6-29T12:5:42,1
2015-6-29T12:20:0,0d60c871bd9180275f1e4104d4b7ded0,5HNB7QZSUI2C,2015-6-29T12:20:5,2015-6-29T12:20:6,KSL2LB0R6367,2015-6-29T12:20:10,2015-6-29T12:20:11,2015-6-29T12:20:12,2015-6-29T12:25:13,1
......
To load this data, I wrote the scala code in Zeppelin like below
import org.apache.spark.sql.types.DateType
import org.apache.spark.sql.functions._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import sys.process._
val logCSV = sc.textFile ("log_table.csv")
case class record(
callingTime:DateTime,
userID:String,
CID:String,
serverConnectionTime:DateTime,
serverResponseTime:DateTime,
connectedAgentID:String,
beginCallingTime:DateTime,
endCallingTime:DateTime,
Succeed:Int)
val formatter = DateTimeFormat.forPattern("yyyy-mm-dd'T'kk:mm:ss")
val logTable = logCSV.map(s => s.split(",") ).map(
s => record(
formatter.parseDateTime( s(0) ),
s(1),
s(2),
formatter.parseDateTime( s(3) ),
formatter.parseDateTime( s(4) ),
s(5),
formatter.parseDateTime( s(6) ),
formatter.parseDateTime( s(7) ),
s(8).toInt
)
).toDF()
The it made error like below. Main issue is DateTime is not serializable.
logCSV: org.apache.spark.rdd.RDD[String] = log_table.csv MapPartitionsRDD[38] at textFile at <console>:169
defined class record
formatter: org.joda.time.format.DateTimeFormatter = org.joda.time.format.DateTimeFormatter@46051d99
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
Then I wonder how I handle date/time information in Scala. Could you help me?
While a DateTime isn't serialiable, if you use the parseMillis method of DateTimeFormatter, you'll get a long, which is toll-free bridged to Long, which is Serializable. To get the DateTime back from the Long, use the DateTime(longInstance.longValue())
constructor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With