I come up with the exception:
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:889) at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:888) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:306) at org.apache.spark.rdd.RDD.foreach(RDD.scala:888) at com.Boot$.test(Boot.scala:60) at com.Boot$.main(Boot.scala:36) at com.Boot.main(Boot.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:525) Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer Serialization stack: - object not serializable (class: org.apache.kafka.clients.producer.KafkaProducer, value: org.apache.kafka.clients.producer.KafkaProducer@77624599) - field (class: com.Boot$$anonfun$test$1, name: producer$1, type: class org.apache.kafka.clients.producer.KafkaProducer) - object (class com.Boot$$anonfun$test$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
// @transient
val sparkConf = new SparkConf()
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// @transient
val sc = new SparkContext(sparkConf)
val requestSet: RDD[String] = sc.textFile(s"hdfs:/user/bigdata/ADVERTISE-IMPRESSION-STAT*/*")
// @transient
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, NearLineConfig.kafka_brokers)
// props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
// props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
props.put(ProducerConfig.BATCH_SIZE_CONFIG, "49152")
// @transient
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
requestSet.foreachPartition((partisions: Iterator[String]) => {
partisions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
producer.close()
In this program i try to read the records from the hdfs path and save them into kafka. the problem is when I remove the codes about sending records to kafka , it runs well. What I missed ?
KafkaProducer
isn't serializable. You'll need to move the creation of the instance to inside foreachPartition
:
requestSet.foreachPartition((partitions: Iterator[String]) => {
val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props)
partitions.foreach((line: String) => {
try {
producer.send(new ProducerRecord[String, String]("testtopic", line))
} catch {
case ex: Exception => {
log.warn(ex.getMessage, ex)
}
}
})
})
Note that KafkaProducer.send
returns a Future[RecordMetadata]
, and the only exception that can propagate from it is SerializationException
if the key or value can't be serialized.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With