I'm mapping over an HBase table, generating one RDD element per HBase row. However, sometimes the row has bad data (throwing a NullPointerException in the parsing code), in which case I just want to skip it.
I have my initial mapper return an Option
to indicate that it returns 0 or 1 elements, then filter for Some
, then get the contained value:
// myRDD is RDD[(ImmutableBytesWritable, Result)]
val output = myRDD.
map( tuple => getData(tuple._2) ).
filter( {case Some(y) => true; case None => false} ).
map( _.get ).
// ... more RDD operations with the good data
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
try {
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
Some( ( id, ( List(x),
// more stuff ...
) ) )
} catch {
case e: NullPointerException => {
logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e)
None
}
}
}
Is there a more idiomatic way to do this that's shorter? I feel like this looks pretty messy, both in getData()
and in the map.filter.map
dance I'm doing.
Perhaps a flatMap
could work (generate 0 or 1 items in a Seq
), but I don't want it to flatten the tuples I'm creating in the map function, just eliminate empties.
The loaded rdd is partitioned by default partitioner: hash code. To specify custom partitioner, use can check rdd. partitionBy(), provided with your own partitioner.
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data, partitioned across nodes in your cluster that can be operated in parallel with a low-level API that offers transformations and actions.
An alternative, and often overlooked way, would be using collect(PartialFunction pf)
, which is meant to 'select' or 'collect' specific elements in the RDD that are defined at the partial function.
The code would look like this:
val output = myRDD.collect{case Success(tuple) => tuple }
def getData(r: Result):Try[(String, List[X])] = Try {
val id = Bytes.toString(key, 0, 11)
val x = Long.MaxValue - Bytes.toLong(key, 11)
(id, List(x))
}
If you change your getData
to return a scala.util.Try
then you can simplify your transformations considerably. Something like this could work:
def getData(r: Result) = {
val key = r.getRow
var id = "(unk)"
var x = -1L
val tr = util.Try{
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
( id, ( List(x)
// more stuff ...
) )
}
tr.failed.foreach(e => logWarning("Skipping id=" + id + ", x=" + x + "; \n" + e))
tr
}
Then your transform could start like so:
myRDD.
flatMap(tuple => getData(tuple._2).toOption)
If your Try
is a Failure
it will be turned into a None
via toOption
and then removed as part of the flatMap
logic. At that point, your next step in the transform will only be working with the successful cases being whatever the underlying type is that is returned from getData
without the wrapping (i.e. No Option
)
If you are ok with dropping the data then you can just use mapPartitions
. Here is a sample:
import scala.util._
val mixedData = sc.parallelize(List(1,2,3,4,0))
mixedData.mapPartitions(x=>{
val foo = for(y <- x)
yield {
Try(1/y)
}
for{goodVals <- foo.partition(_.isSuccess)._1}
yield goodVals.get
})
If you want to see the bad values, then you can use an accumulator
or just log as you have been.
Your code would look something like this:
val output = myRDD.
mapPartitions( tupleIter => getCleanData(tupleIter) )
// ... more RDD operations with the good data
def getCleanData(iter: Iter[???]) = {
val triedData = getDataInTry(iter)
for{goodVals <- triedData.partition(_.isSuccess)._1}
yield goodVals.get
}
def getDataInTry(iter: Iter[???]) = {
for(r <- iter) yield {
Try{
val key = r._2.getRow
var id = "(unk)"
var x = -1L
id = Bytes.toString(key, 0, 11)
x = Long.MaxValue - Bytes.toLong(key, 11)
// ... more code that might throw exceptions
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With