SparkContext defines a couple of implicit conversions between Writable and their primitive types, like LongWritable <-> Long, Text <-> String.
I am using the following code to combine small files
@Test
def testCombineSmallFiles(): Unit = {
val path = "file:///d:/logs"
val rdd = sc.newAPIHadoopFile[LongWritable,Text, CombineTextInputFormat](path)
println(s"rdd partition number is ${rdd.partitions.length}")
println(s"lines is :${rdd.count()}")
}
The above code works well, but If I use the following line to get the rdd, it will result in a compiling error:
val rdd = sc.newAPIHadoopFile[Long,String, CombineTextInputFormat](path)
It looks that the implicit conversion doesn't take effect. I would like to know what's wrong here and why it isn't working.
With following code that is using sequenceFile, the implicit conversion looks working(Text is converted to String, and IntWritable is converted to Int)
@Test
def testReadWriteSequenceFile(): Unit = {
val data = List(("A", 1), ("B", 2), ("C", 3))
val outputDir = Utils.getOutputDir()
sc.parallelize(data).saveAsSequenceFile(outputDir)
//implicit conversion works for the SparkContext#sequenceFile method
val rdd = sc.sequenceFile(outputDir + "/part-00000", classOf[String], classOf[Int])
rdd.foreach(println)
}
Compare these two test cases, I didn't see the key difference that make's one work,and the other doesn't work.
The SparkContext#sequenceFile method that I am using in the TEST CASE 2 is:
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}
In the sequenceFile method,it is calling another sequenceFile method, which is calling hadoopFile method to read the data
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}
To use implicit convertions WritableConverter is needed.
For example :
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {...}
I can't see anywhere in doc sc.newAPIHadoopFile used that. so its not possible.
Also, Please verify you have used import SparkContext._ (I cant able to see the imports in your post)
Pls. have a look at WritableConverters in SparkContext which has below code
/**
* A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
* class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
* conversion.
* The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
* that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
* support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
*/
private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable
object WritableConverter {
// Helper objects for converting common types to Writable
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.
implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)
implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)
implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)
implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
}
}
implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
EDIT :
I have updated my question and give two test cases, one works, the other doesn't, but I can't figure out what's the difference between them.
WritableConverters are needed for implicit conversion.Testcase1 i.e val rdd = sc.newAPIHadoopFile...(path) implicit conversion was NOT done in side SparkContext. That's why if you pass Long it wont work, will give compiler error
TestCase2 i.e val rdd = sc.sequenceFile(...) you are passing ClassOf[...] directly. If you are passing ClassOf[...] no implicit conversion is needed since these are classes not Long value or String Value..
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With