Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to flatten tuples in Spark?

I'm looking to flatten an RDD of tuples (using a no-op map), but I'm getting a type error:

val fromTuples = sc.parallelize( List((1,"a"), (2, "b"), (3, "c")) )
val flattened = fromTuples.flatMap(x => x)
println(flattened.collect().toNiceString)

Gives

error: type mismatch;

found : (Int, String) required: TraversableOnce[?]

val flattened = fromMap.flatMap(x => x)

The equivalent list of Lists or Arrays work fine, e.g.:

val fromList = sc.parallelize(List(List(1, 2), List(3, 4)))
val flattened = fromList.flatMap(x => x)
println(flattened.collect().toNiceString)

Can Scala handle this? If not, why not?

like image 715
gleech Avatar asked Mar 21 '17 15:03

gleech


3 Answers

Tuples aren't collections. Unlike Python, where a tuple is essentially just an immutable list, a tuple in Scala is more like a class (or more like a Python namedtuple). You can't "flatten" a tuple, because it's a heterogeneous group of fields.

You can convert a tuple to something iterable by calling .productIterator on it, but what you get back is an Iterable[Any]. You can certainly flatten such a thing, but you've lost all compile-time type protection that way. (Most Scala programmers shudder at the thought of a collection of type Any.)

like image 50
Brian Clapper Avatar answered Sep 21 '22 06:09

Brian Clapper


There isn't a great way, but you can perserve a little type safety with this method:

val fromTuples = session.sparkContext.parallelize(List((1, "a"), (2, "b"), (3, "c")))
val flattened = fromTuples.flatMap(t => Seq(t._1, t._2))
println(flattened.collect().mkString)

The type of flatten will be an RDD of whatever the parent of all the types in the tuple. Which, yes, in this case is Any but if the list were: List(("1", "a"), ("2", "b")) it would preserve the String type.

like image 39
sfosdal Avatar answered Sep 21 '22 06:09

sfosdal


  val fromTuples = sc.parallelize(List((1, "a"), (2, "b"), (3, "c")))
  val flattened = fromTuples.flatMap(x => Array(x))
  flattened.collect()

The reason for your error is

flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

like image 39
Balaji Reddy Avatar answered Sep 21 '22 06:09

Balaji Reddy