I am using Spark to read a bunch of files, elaborating on them and then saving all of them as a Sequence file. What I wanted, was to have 1 sequence file per partition, so I did this:
SparkConf sparkConf = new SparkConf().setAppName("writingHDFS")
.setMaster("local[2]")
.set("spark.streaming.stopGracefullyOnShutdown", "true");
final JavaSparkContext jsc = new JavaSparkContext(sparkConf);
jsc.hadoopConfiguration().addResource(hdfsConfPath + "hdfs-site.xml");
jsc.hadoopConfiguration().addResource(hdfsConfPath + "core-site.xml");
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5*1000));
JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(sourcePath);
if(!imageByteRDD.isEmpty())
imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {
@Override
public void call(Iterator<Tuple2<String, PortableDataStream>> arg0){
throws Exception {
[°°°SOME STUFF°°°]
SequenceFile.Writer writer = SequenceFile.createWriter(
jsc.hadoopConfiguration(),
//here lies the problem: how to pass the hadoopConfiguration I have put inside the Spark Context?
Previously, I created a Configuration for each partition, and it works, but I'm sure there is a much more "sparky way"
Does anybody know how to use the Hadoop Configuration Object inside the RDD closures?
The problem here is that Hadoop Configuration's aren't tagged as Serializable
, so Spark wont pull them into RDDs. They are marked as Writable
, so Hadoop's serialization mechanism can marshall and unmarshall them, but Spark doesn't directly work with that
The two long term fix options would be
You aren't going to hit any major objections to making Hadoop conf serializable; provided you implement custom ser/deser methods which delegate to the writable IO calls (and which just iterate through all key/value pairs). I say that as a Hadoop committer.
Update: Here's the code to create a serlializable class which does marshall the contents of a Hadoop config. Create it with val ser = new ConfSerDeser(hadoopConf)
; refer to it in your RDD as ser.get()
.
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.conf.Configuration
/**
* Class to make Hadoop configurations serializable; uses the
* `Writeable` operations to do this.
* Note: this only serializes the explicitly set values, not any set
* in site/default or other XML resources.
* @param conf
*/
class ConfigSerDeser(var conf: Configuration) extends Serializable {
def this() {
this(new Configuration())
}
def get(): Configuration = conf
private def writeObject (out: java.io.ObjectOutputStream): Unit = {
conf.write(out)
}
private def readObject (in: java.io.ObjectInputStream): Unit = {
conf = new Configuration()
conf.readFields(in)
}
private def readObjectNoData(): Unit = {
conf = new Configuration()
}
}
Note that it would be relatively straightforward for someone to make this generic for all Writeable classes; you'd just need to provide a classname in the constructor and use that to instantiate the writeable during deserialization.
You can serialize and deserialize the org.apache.hadoop.conf.Configuration
using org.apache.spark.SerializableWritable
.
For example:
import org.apache.spark.SerializableWritable
...
val hadoopConf = spark.sparkContext.hadoopConfiguration
// serialize here
val serializedConf = new SerializableWritable(hadoopConf)
// then access the conf by calling .value on serializedConf
rdd.map(someFunction(serializedConf.value))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With