I am trying to persist my RDD using off heap storage on spark 1.4.0 and tachyon 0.6.4 doing it like this :
val a = sqlContext.parquetFile("a1.parquet") a.persist(org.apache.spark.storage.StorageLevel.OFF_HEAP) a.count()
Afterwards I am getting the following exception.
Any ideas on that?
15/06/16 10:14:53 INFO : Tachyon client (version 0.6.4) is trying to connect master @ localhost/127.0.0.1:19998 15/06/16 10:14:53 INFO : User registered at the master localhost/127.0.0.1:19998 got UserId 3 15/06/16 10:14:53 INFO TachyonBlockManager: Created tachyon directory at /tmp_spark_tachyon/spark-6b2512ab-7bb8-47ca-b6e2-8023d3d7f7dc/driver/spark-tachyon-20150616101453-ded3 15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_3 on ExternalBlockStore on localhost:33548 (size: 0.0 B) 15/06/16 10:14:53 INFO BlockManagerInfo: Added rdd_10_1 on ExternalBlockStore on localhost:33548 (size: 0.0 B) 15/06/16 10:14:53 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 5710423667942934352 org.apache.spark.storage.BlockNotFoundException: Block rdd_10_3 not found at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:306) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
I have also tried the same with the text file and I was able to persist it in tachyon. The problem is with persisting DataFrame originally read from parquet.
There seems to be a related bug report: https://issues.apache.org/jira/browse/SPARK-10314
Since there seems to be a pull request for this, there might be a chance to soon get a fix for this.
From this thread, https://groups.google.com/forum/#!topic/tachyon-users/xb8zwqIjIa4, it looks like Spark is using TRY_CACHE mode to write to Tachyon so the data seems to get lost when evicted from the cache.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With