Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Efficiently manipulating subsets of RDD's keys in spark

I have an RDD of (key, value) pairs of the form

RDD[(
  scala.collection.immutable.Vector[(Byte, Byte)],   
  scala.collection.immutable.Vector[Int]
)]

where key is a Vector[(Byte, Byte)] and value is Vector[Int].

For example, the contents of the RDD can be as shown below.

(Vector((3,3), (5,5)), Vector(1, 2)),
(Vector((1,1), (2,2), (3,3),(4,4), (5,5)), Vector(1, 3, 4, 2)), 
(Vector((1,1), (2,3)), Vector(1, 4, 2)), 
(Vector((1,1), (2,2), (5,5)), Vector(3, 5)),

I would like to do a manipulation on this RDD so that in the resultant RDD, for every (key, value) pairs the following condition is met.

When a key 'k1' of this RDD is a subset of key 'k2' of this RDD, k1's values should be updated to contain k2's values as well, while k2's values will remain the same.

The above example RDD will become,

(Vector((3,3), (5,5)), Vector(1, 2, 3, 4)), 
(Vector((1,1), (2,2), (3,3), (4,4), (5,5)), Vector(1, 3, 4, 2))
(Vector((1,1), (2,3)), Vector(1, 4, 2))
(Vector((1,1), (2,2), (5,5)), Vector(1, 2, 3, 4, 5))

I have asked a similar question here. The solution provided is given below(slightly modified to suit my problem). This works but very inefficient for large datasets.

val resultPre = rddIn
  .flatMap { case (colMapkeys, rowIds) => 
    colMapkeys.subsets.tail.map(_ -> rowIds)
  }
  .reduceByKey(_ ++ _)
  .join(rddIn map identity[(Seq[(Byte, Byte)], Vector[Int])])
  .map{ case (key, (v, _)) => (key, v) }


implicit class SubSetsOps[T](val elems: Seq[T]) extends AnyVal {
  def subsets: Vector[Seq[T]] = elems match {
    case Seq() => Vector(elems)
    case elem +: rest => {
      val recur = rest.subsets
      recur ++ recur.map(elem +: _)
    }
  }
}

Generating all subsets of keys and then filtering them by joining with original RDD keys seems to be ineffective.

How do I handle this efficiently?

like image 967
CRM Avatar asked Jan 22 '16 05:01

CRM


1 Answers

I think your problem is fundamentally hard. You basically have 2 ways to do this:

  1. Generate all subset keys, merge the lists of values and gather the fina list for any given subset, then join to existing subsets only. (this is what you are doing).

  2. Compare each entry to each other entry, see if one key is a subset of the other and then merge all subsets generated this way by key. This does not generate spurious key intermediate permutations.

Which one is more efficient will depend on the nature of your data (size of key vectors, number of times they are subsets of each other, etc).

The other optimizations you could try is to make the data a little simpler to handle. For instance, you could safely map your inner coordinates to integer (they are just Bytes). Say (5,5) to 5*1000 + 5 = 5005. Since integer comparison is easier and faster than comparing tuples.

Depending on how much you understand the domain of keys. If this space is small enough, you could try representing your keys as bitmaps or some such. These changes will not fundamentally change the number of keys you have, but might make comparisons and other operations much easier.

like image 125
Daniel Langdon Avatar answered Nov 04 '22 17:11

Daniel Langdon