Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming Exception: java.util.NoSuchElementException: None.get

I am writing SparkStreaming data to HDFS by converting it to a dataframe:

Code

object KafkaSparkHdfs {

  val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
  sparkConf.set("spark.driver.allowMultipleContexts", "true");
  val sc = new SparkContext(sparkConf)

  def main(args: Array[String]): Unit = {
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "stream3",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("fridaydata")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
    )

    val lines = stream.map(consumerRecord => consumerRecord.value)
    val words = lines.flatMap(_.split(" "))
    val wordMap = words.map(word => (word, 1))
    val wordCount = wordMap.reduceByKey(_ + _)

    wordCount.foreachRDD(rdd => {
      val dataframe = rdd.toDF(); 
      dataframe.write
        .mode(SaveMode.Append)
        .save("hdfs://localhost:9000/newfile24")     
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

The folder is created but the file is not written.

The program is getting terminated with the following error:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.lang.Thread.run(Thread.java:748)
    18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
    at scala.None$.get(Option.scala:347)
    at scala.None$.get(Option.scala:345)
    at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
    at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11
like image 948
andani Avatar asked Oct 28 '22 09:10

andani


2 Answers

The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

By looking at the documentation we see:

Create a StreamingContext by providing the configuration necessary for a new SparkContext

In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

val ssc = new StreamingContext(sc, Seconds(20))

Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

val spark = SparkSession().builder
  .setMaster("local[*]")
  .setAppName("SparkKafka")
  .getOrCreate()

import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
like image 52
Shaido Avatar answered Nov 15 '22 09:11

Shaido


There is an obvious problem here - coalesce(1).

dataframe.coalesce(1)

While reducing number of files might be tempting in many scenarios, it should be done if and only if it amount of data is low enough for nodes to handle (clearly it isn't here).

Furthermore, let me quote the documentation:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The conclusion is you should adjust the parameter accordingly to the expected amount of data and desired parallelism. coalesce(1) as such is rarely useful in practice, especially in a context like streaming, where data properties can differ over time.

like image 42
user9985951 Avatar answered Nov 15 '22 09:11

user9985951