I have a RDD of format RDD[((Long, Long), (Long, Long))] and I need to covert or transform into RDD[((Long, Long), (Long, Long, Long, Long))] where second RDD tuple is based on a function from the first RDD.
I am trying to achieve this based map function but, I think am doing something wrong here. Please help me to solve the issue.
Here is the full code:
package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map
class ListItemCorrelation(sc: SparkContext) extends Serializable {
def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
if (dirX.equals(1)) {
if (dirY.equals(1)) {
return (1, 0, 0, 0)
} else {
return (0, 1, 0, 0)
}
} else {
if (dirY.equals(1)) {
return (0, 0, 1, 0)
} else {
return (0, 0, 0, 1)
}
}
}
def run(votes: String): RDD[((Long, Long), (Long, Long, Long, Long))] = {
val userVotes = sc.textFile(votes)
val userVotesPairs = userVotes.map { t =>
val p = t.split(",")
(p(0).toLong, (p(1).toLong, p(2).toLong))
}
val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
//More functionality
return result
}
}
object ListItemCorrelation extends Serializable {
def main(args: Array[String]) {
val votes = args(0)
val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
val context = new SparkContext(conf)
val job = new ListItemCorrelation(context)
val results = job.run(votes)
val output = args(1)
results.saveAsTextFile(output)
context.stop()
}
}
When I try to run this script I am getting following error:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.map(RDD.scala:369) at com.ranker.correlation.listitem.ListItemCorrelation.run(ListItemCorrelation.scala:34) at com.ranker.correlation.listitem.ListItemCorrelation$.main(ListItemCorrelation.scala:47) at com.ranker.correlation.listitem.ListItemCorrelation.main(ListItemCorrelation.scala) Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext Serialization stack: - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@4248e66b) - field (class: com.ranker.correlation.listitem.ListItemCorrelation, name: sc, type: class org.apache.spark.SparkContext) - object (class com.ranker.correlation.listitem.ListItemCorrelation, com.ranker.correlation.listitem.ListItemCorrelation@270b6b5e) - field (class: com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, name: $outer, type: class com.ranker.correlation.listitem.ListItemCorrelation) - object (class com.ranker.correlation.listitem.ListItemCorrelation$$anonfun$4, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 12 more
This error happening while executing following line:
var second = first.map(t => ((t._1._1, t._2._1), up_down(t._1._2, t._2._2)))
I am very new to scala, please help me to find the right way to do this.
Put the up_down
method on a companion object. When any class variable is accessed within an RDD closure, the class (and everything in it, like SparkContext) is serialized. Method parameters count as class variables here. Using a static object will get around this:
package com.ranker.correlation.listitem
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import scala.collection.Map
object ListItemCorrelation {
def up_down(dirX: Long, dirY: Long): (Long, Long, Long, Long) = {
if (dirX.equals(1)) {
if (dirY.equals(1)) {
return (1, 0, 0, 0)
} else {
return (0, 1, 0, 0)
}
} else {
if (dirY.equals(1)) {
return (0, 0, 1, 0)
} else {
return (0, 0, 0, 1)
}
}
}
}
class ListItemCorrelation(sc: SparkContext) extends Serializable {
def run(votes: String): RDD[((Long, Long), (Long, Long, Long, Long))] = {
val userVotes = sc.textFile(votes)
val userVotesPairs = userVotes.map { t =>
val p = t.split(",")
(p(0).toLong, (p(1).toLong, p(2).toLong))
}
val jn = userVotesPairs.join(userVotesPairs).values.filter(t => t._1._1.<(t._2._1))
val first = jn.map(t => ((t._1._1, t._2._1), (t._1._2, t._2._2)))
var second = first.map(t => ((t._1._1, t._2._1), ListItemCorrelation.up_down(t._1._2, t._2._2)))
//More functionality
return result
}
}
object ListItemCorrelation extends Serializable {
def main(args: Array[String]) {
val votes = args(0)
val conf = new SparkConf().setAppName("SparkJoins").setMaster("local")
val context = new SparkContext(conf)
val job = new ListItemCorrelation(context)
val results = job.run(votes)
val output = args(1)
results.saveAsTextFile(output)
context.stop()
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With