I am writing SparkStreaming
data to HDFS by converting it to a dataframe:
Code
object KafkaSparkHdfs {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")
sparkConf.set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(sparkConf)
def main(args: Array[String]): Unit = {
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ssc = new StreamingContext(sparkConf, Seconds(20))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "stream3",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("fridaydata")
val stream = KafkaUtils.createDirectStream[String, String](
ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)
)
val lines = stream.map(consumerRecord => consumerRecord.value)
val words = lines.flatMap(_.split(" "))
val wordMap = words.map(word => (word, 1))
val wordCount = wordMap.reduceByKey(_ + _)
wordCount.foreachRDD(rdd => {
val dataframe = rdd.toDF();
dataframe.write
.mode(SaveMode.Append)
.save("hdfs://localhost:9000/newfile24")
})
ssc.start()
ssc.awaitTermination()
}
}
The folder is created but the file is not written.
The program is getting terminated with the following error:
18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.lang.Thread.run(Thread.java:748)
18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
In my pom I am using respective dependencies:
The error is due to trying to run multiple spark contexts at the same time. Setting allowMultipleContexts
to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext
is used everywhere. From the code we can see that the SparkContext
(sc
) is used to create a SQLContext
which is fine. However, when creating the StreamingContext
it is not used, instead the SparkConf
is used.
By looking at the documentation we see:
Create a StreamingContext by providing the configuration necessary for a new SparkContext
In other words, by using SparkConf
as parameter a new SparkContext
will be created. Now there are two separate contexts.
The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext
to:
val ssc = new StreamingContext(sc, Seconds(20))
Note: In newer versions of Spark (2.0+) use SparkSession
instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...)
. It can look as follows:
val spark = SparkSession().builder
.setMaster("local[*]")
.setAppName("SparkKafka")
.getOrCreate()
import sqlContext.implicits._
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
There is an obvious problem here - coalesce(1)
.
dataframe.coalesce(1)
While reducing number of files might be tempting in many scenarios, it should be done if and only if it amount of data is low enough for nodes to handle (clearly it isn't here).
Furthermore, let me quote the documentation:
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
The conclusion is you should adjust the parameter accordingly to the expected amount of data and desired parallelism. coalesce(1)
as such is rarely useful in practice, especially in a context like streaming, where data properties can differ over time.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With