Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: dealing with Option/Some/None in RDDs

I'm mapping over an HBase table, generating one RDD element per HBase row. However, sometimes the row has bad data (throwing a NullPointerException in the parsing code), in which case I just want to skip it.

I have my initial mapper return an Option to indicate that it returns 0 or 1 elements, then filter for Some, then get the contained value:

// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
  map( tuple => getData(tuple._2) ).
  filter( {case Some(y) => true; case None => false} ).
  map( _.get ).
  // ... more RDD operations with the good data

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  try {
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    Some( ( id, ( List(x),
          // more stuff ...
        ) ) )
  } catch {
    case e: NullPointerException => {
      logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
      None
    }
  }
}

Is there a more idiomatic way to do this that's shorter? I feel like this looks pretty messy, both in getData() and in the map.filter.map dance I'm doing.

Perhaps a flatMap could work (generate 0 or 1 items in a Seq), but I don't want it to flatten the tuples I'm creating in the map function, just eliminate empties.

like image 680
Ken Williams Avatar asked Mar 17 '15 15:03

Ken Williams


People also ask

How can you create an RDD with specific partitioning?

The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.

How does Spark RDDs work?

RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.


3 Answers

An alternative, and often overlooked way, would be using collect(PartialFunction pf), which is meant to 'select' or 'collect' specific elements in the RDD that are defined at the partial function.

The code would look like this:

val output = myRDD.collect{case Success(tuple) => tuple }

def getData(r: Result):Try[(String, List[X])] = Try {
        val id = Bytes.toString(key, 0, 11)
        val x = Long.MaxValue - Bytes.toLong(key, 11)
        (id, List(x))
}
like image 120
maasg Avatar answered Oct 11 '22 23:10

maasg


If you change your getData to return a scala.util.Try then you can simplify your transformations considerably. Something like this could work:

def getData(r: Result) = {
  val key = r.getRow
  var id = "(unk)"
  var x = -1L

  val tr = util.Try{
    id = Bytes.toString(key, 0, 11)
    x = Long.MaxValue - Bytes.toLong(key, 11)
    // ... more code that might throw exceptions

    ( id, ( List(x)
          // more stuff ...
     ) )
  } 

  tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
  tr
}

Then your transform could start like so:

myRDD.
  flatMap(tuple => getData(tuple._2).toOption)

If your Try is a Failure it will be turned into a None via toOption and then removed as part of the flatMap logic. At that point, your next step in the transform will only be working with the successful cases being whatever the underlying type is that is returned from getData without the wrapping (i.e. No Option)

like image 20
cmbaxter Avatar answered Oct 12 '22 00:10

cmbaxter


If you are ok with dropping the data then you can just use mapPartitions. Here is a sample:

import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
  val foo = for(y <- x)
   yield {
    Try(1/y)
  }
  for{goodVals <- foo.partition(_.isSuccess)._1}
   yield goodVals.get
})

If you want to see the bad values, then you can use an accumulator or just log as you have been.

Your code would look something like this:

val output = myRDD.
  mapPartitions( tupleIter => getCleanData(tupleIter) )
  // ... more RDD operations with the good data

def getCleanData(iter: Iter[???]) = {
  val triedData = getDataInTry(iter)
  for{goodVals <- triedData.partition(_.isSuccess)._1}
    yield goodVals.get
}

def getDataInTry(iter: Iter[???]) = {
  for(r <- iter) yield {
    Try{
      val key = r._2.getRow
      var id = "(unk)"
      var x = -1L
      id = Bytes.toString(key, 0, 11)
      x = Long.MaxValue - Bytes.toLong(key, 11)
      // ... more code that might throw exceptions
    }
  }
}
like image 20
Justin Pihony Avatar answered Oct 11 '22 22:10

Justin Pihony