The exception message as following
User class threw exception: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 11, 10.215.155.82): java.lang.NullPointerException at org.joda.time.tz.CachedDateTimeZone.getInfo(CachedDateTimeZone.java:143) at org.joda.time.tz.CachedDateTimeZone.getOffset(CachedDateTimeZone.java:103) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:676) at org.joda.time.format.DateTimeFormatter.printTo(DateTimeFormatter.java:521) at org.joda.time.format.DateTimeFormatter.print(DateTimeFormatter.java:625) at org.joda.time.base.AbstractDateTime.toString(AbstractDateTime.java:328) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3$$anonfun$apply$1.apply(DateTimeNullReferenceReappear.scala:41) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:328) at scala.collection.TraversableLike$$anonfun$groupBy$1.apply(TraversableLike.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at org.apache.spark.util.collection.CompactBuffer$$anon$1.foreach(CompactBuffer.scala:113) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at org.apache.spark.util.collection.CompactBuffer.foreach(CompactBuffer.scala:28) at scala.collection.TraversableLike$class.groupBy(TraversableLike.scala:327) at org.apache.spark.util.collection.CompactBuffer.groupBy(CompactBuffer.scala:28) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:41) at com.xxx.ieg.face.demo.DateTimeNullReferenceReappear$$anonfun$3.apply(DateTimeNullReferenceReappear.scala:40) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:1081) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1314) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744)
My code as following:
import org.apache.hadoop.conf.Configuration
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.{ SparkConf, SparkContext }
import org.joda.time.DateTime
import org.joda.time.format.{ DateTimeFormat, DateTimeFormatter }
object DateTimeNullReferenceReappear extends App {
case class Record(uin: String = "", date: DateTime = null, value: Double = 0.0)
val cfg = new Configuration
val sparkConf = new SparkConf()
sparkConf.setAppName("bourne_exception_reappear")
val sc = new SparkContext(sparkConf)
val data = TDWSparkContext.tdwTable( // this function just read data from an data warehouse
sc,
tdwuser = FaceConf.TDW_USER,
tdwpasswd = FaceConf.TDW_PASSWORD,
dbName = "my_db",
tblName = "my_table",
parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
.map(row => {
Record(uin = row(2),
date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
value = row(4).toDouble)
}).map(x => (x.uin, (x.date, x.value)))
.groupByKey
.map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
// val data = TDWSparkContext.tdwTable( // It works, as I don't user datetime toString in the groupBy
// sc,
// tdwuser = FaceConf.TDW_USER,
// tdwpasswd = FaceConf.TDW_PASSWORD,
// dbName = "hy",
// tblName = "t_dw_cf_oss_tblogin",
// parts = Array("p_20150323", "p_20150324", "p_20150325", "p_20150326", "p_20150327", "p_20150328", "p_20150329"))
// .map(row => {
// Record(uin = row(2),
// date = DateTimeFormat.forPattern("yyyyMMdd").parseDateTime(row(0)),
// value = row(4).toDouble)
// }).map(x => (x.uin, (x.date.toString("yyyyMMdd"), x.value)))
// .groupByKey
// .map(x => {
// x._2.groupBy(_._1).mapValues(_.map(_._2).sum)
// })
data.take(10).map(println)
}
So, it seems that call toString in the groupBy cause the exception, so can anybody explain it?
Thanks
The problem here is bad serialization of Joda's CachedDateTimeZone
- it includes a transient field that doesn't get serialized, remaining null
in the deserialized object.
You can create and register your own Serializer
that handles this object properly:
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.joda.time.DateTimeZone;
import org.joda.time.tz.CachedDateTimeZone;
public class JodaCachedDateTimeZoneSerializer extends Serializer<CachedDateTimeZone> {
public JodaCachedDateTimeZoneSerializer() {
setImmutable(true);
}
@Override
public CachedDateTimeZone read(final Kryo kryo, final Input input, final Class<CachedDateTimeZone> type) {
// reconstruct from serialized ID:
final String id = input.readString();
return CachedDateTimeZone.forZone(DateTimeZone.forID(id));
}
@Override
public void write(final Kryo kryo, final Output output, final CachedDateTimeZone cached) {
// serialize ID only:
output.writeString(cached.getID());
}
}
Then, in your class extending KryoRegistrator
, add:
kryo.register(classOf[CachedDateTimeZone], new JodaCachedDateTimeZoneSerializer())
This way you don't have to disable Kryo or refrain from using Joda.
You need to either disable Kryo, use Kryo JodaTime Serializers, or avoid serializing the DateTime object, i.e. pass around Longs.
We don't know much about the "problem". So we can try following experimat which will let us see more about the problem.
Replace the following part,
map(x => {
x._2.groupBy(_._1.toString("yyyyMMdd")).mapValues(_.map(_._2).sum) // throw exception here
})
With this,
map( x => {
x._2.groupBy( t => {
val dateStringTry = Try( t._2.toString( "yyyyMMdd" ) )
dateStringTry match {
case Success( dateString ) => Right( dateString )
case Failure( e ) => {
println( "=========== Null Tuple Description ==========" )
println( "Problem Tuple :: [" + t + "]" )
println( "Error Info :: [" + e.getMessage + "]" )
// finally the stack trace, if needed
// e.printStackTrace()
prinln( "=============================================" )
Left( e )
}
}
} )
} )
Let's check the result of running this experiment.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With