Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does implicit conversions for Writable doesn't work

SparkContext defines a couple of implicit conversions between Writable and their primitive types, like LongWritable <-> Long, Text <-> String.

  • TEST CASE 1:

I am using the following code to combine small files

  @Test
  def  testCombineSmallFiles(): Unit = {
    val path = "file:///d:/logs"
    val rdd = sc.newAPIHadoopFile[LongWritable,Text, CombineTextInputFormat](path)
    println(s"rdd partition number is ${rdd.partitions.length}")
    println(s"lines is :${rdd.count()}")
  }

The above code works well, but If I use the following line to get the rdd, it will result in a compiling error:

val rdd = sc.newAPIHadoopFile[Long,String, CombineTextInputFormat](path)

It looks that the implicit conversion doesn't take effect. I would like to know what's wrong here and why it isn't working.

  • TEST CASE 2:

With following code that is using sequenceFile, the implicit conversion looks working(Text is converted to String, and IntWritable is converted to Int)

 @Test
  def testReadWriteSequenceFile(): Unit = {
    val data = List(("A", 1), ("B", 2), ("C", 3))
    val outputDir = Utils.getOutputDir()
    sc.parallelize(data).saveAsSequenceFile(outputDir)
    //implicit conversion works for the SparkContext#sequenceFile method
    val rdd = sc.sequenceFile(outputDir + "/part-00000", classOf[String], classOf[Int])
    rdd.foreach(println)
  }

Compare these two test cases, I didn't see the key difference that make's one work,and the other doesn't work.

  • NOTE:

The SparkContext#sequenceFile method that I am using in the TEST CASE 2 is:

  def sequenceFile[K, V](
      path: String,
      keyClass: Class[K],
      valueClass: Class[V]): RDD[(K, V)] = withScope {
    assertNotStopped()
    sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
  }

In the sequenceFile method,it is calling another sequenceFile method, which is calling hadoopFile method to read the data

  def sequenceFile[K, V](path: String,
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int
      ): RDD[(K, V)] = withScope {
    assertNotStopped()
    val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
    hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
  }
like image 988
Tom Avatar asked Apr 01 '26 15:04

Tom


1 Answers

To use implicit convertions WritableConverter is needed. For example :

   def sequenceFile[K, V]
       (path: String, minPartitions: Int = defaultMinPartitions)
       (implicit km: ClassTag[K], vm: ClassTag[V],
        kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {...}

I can't see anywhere in doc sc.newAPIHadoopFile used that. so its not possible.

Also, Please verify you have used import SparkContext._ (I cant able to see the imports in your post)

Pls. have a look at WritableConverters in SparkContext which has below code

/**
 * A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
 * class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
 * conversion.
 * The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
 * that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
 * support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
 */
private[spark] class WritableConverter[T](
    val writableClass: ClassTag[T] => Class[_ <: Writable],
    val convert: Writable => T)
  extends Serializable

object WritableConverter {

  // Helper objects for converting common types to Writable
  private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
  : WritableConverter[T] = {
    val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
  }

  // The following implicit functions were in SparkContext before 1.3 and users had to
  // `import SparkContext._` to enable them. Now we move them here to make the compiler find
  // them automatically. However, we still keep the old functions in SparkContext for backward
  // compatibility and forward to the following functions directly.

  implicit def intWritableConverter(): WritableConverter[Int] =
    simpleWritableConverter[Int, IntWritable](_.get)

  implicit def longWritableConverter(): WritableConverter[Long] =
    simpleWritableConverter[Long, LongWritable](_.get)

  implicit def doubleWritableConverter(): WritableConverter[Double] =
    simpleWritableConverter[Double, DoubleWritable](_.get)

  implicit def floatWritableConverter(): WritableConverter[Float] =
    simpleWritableConverter[Float, FloatWritable](_.get)

  implicit def booleanWritableConverter(): WritableConverter[Boolean] =
    simpleWritableConverter[Boolean, BooleanWritable](_.get)

  implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
    simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
      // getBytes method returns array which is longer then data to be returned
      Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
    }
  }

  implicit def stringWritableConverter(): WritableConverter[String] =
    simpleWritableConverter[String, Text](_.toString)

  implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
    new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

EDIT :

I have updated my question and give two test cases, one works, the other doesn't, but I can't figure out what's the difference between them.

WritableConverters are needed for implicit conversion.

  • Testcase1 i.e val rdd = sc.newAPIHadoopFile...(path) implicit conversion was NOT done in side SparkContext. That's why if you pass Long it wont work, will give compiler error

  • TestCase2 i.e val rdd = sc.sequenceFile(...) you are passing ClassOf[...] directly. If you are passing ClassOf[...] no implicit conversion is needed since these are classes not Long value or String Value..

like image 135
Ram Ghadiyaram Avatar answered Apr 04 '26 07:04

Ram Ghadiyaram



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!