I am learning something about spark streaming and I have got a program which is designed to find the top 5 words.
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
object Top5{
   def main(args:Array[String]){
  val conf=new SparkConf()
  conf.setAppName("AppName")
  conf.setMaster("spark://SparkMaster:7077")
  val ssc=new StreamingContext(conf,Seconds(10))
  val hottestStream=ssc.socketTextStream("SparkMaster:7077", 9999)
  val searchPair=hottestStream.map(_.split("")(1)).map(item=>(item,1))
  val hottestDStream=searchPair.reduceByKeyAndWindow((v1:Int,v2:Int)=>v1+v2,Seconds(60),Seconds(20))
  hottestDStream.transform(hottestItemRDD=>{
    val top5=hottestItemRDD.map(pair=>(pair._2,pair._1)).sortByKey(false)
                 .map(pair=>(pair._2,pair._1)).take(3) 
     for(item<-top5){
       println(item)
     }
    hottestItemRDD}
  ).print()
  ssc.start()
  ssc.awaitTermination()
}}
As I am performing it in the spark cluster environment, the error says
Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
I have searched my question in stackoverflow. And there is a similar question org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.lookupTimeout The answer tells me  to increase spark.timeout.network, is that right? Besides, where can I find spark.timeout.network?
For heavy workloads it is recommended to increase spark.network.timeout to 800 seconds:
--conf spark.network.timeout=800
This is described in the IBM documentation about troubleshooting and tuning Spark for heavy workloads.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With