Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to flatten a collection with Spark/Scala?


In Scala I can flatten a collection using :

val array = Array(List("1,2,3").iterator,List("1,4,5").iterator)                                                   //> array  : Array[Iterator[String]] = Array(non-empty iterator, non-empty itera                                                   //| tor)       array.toList.flatten                      //> res0: List[String] = List(1,2,3, 1,4,5) 

But how can I perform similar in Spark ?

Reading the API doc http://spark.apache.org/docs/0.7.3/api/core/index.html#spark.RDD there does not seem to be a method which provides this functionality ?

like image 251
blue-sky Avatar asked Apr 17 '14 16:04


People also ask

How does flatten work in Scala?

The flatten function is applicable to both Scala's Mutable and Immutable collection data structures. The flatten method will collapse the elements of a collection to create a single collection with elements of the same type.

What is flatten in spark?

In the Spark SQL, flatten function is a built-in function that is defined as a function to convert an Array of the Array column (nested array) that is ArrayanyType(ArrayanyType(StringType)) into the single array column on the Spark DataFrame. The Spark SQL is defined as the Spark module for structured data processing.

How do you flatten a schema?

To flatten the active schema, select the command Schema Design | Flatten Schema. This pops up the Flatten Schema dialog (screenshot below), which contains the names of separate files, one for each namespace that will be in the flattened schema. These default names are the same as the original filenames.

2 Answers

Use flatMap and the identity Predef, this is more readable than using x => x, e.g.

like image 119
samthebest Avatar answered Sep 20 '22 08:09


Try flatMap with an identity map function (y => y):

scala> val x = sc.parallelize(List(List("a"), List("b"), List("c", "d"))) x: org.apache.spark.rdd.RDD[List[String]] = ParallelCollectionRDD[1] at parallelize at <console>:12  scala> x.collect() res0: Array[List[String]] = Array(List(a), List(b), List(c, d))  scala> x.flatMap(y => y) res3: org.apache.spark.rdd.RDD[String] = FlatMappedRDD[3] at flatMap at <console>:15  scala> x.flatMap(y => y).collect() res4: Array[String] = Array(a, b, c, d) 
like image 39
Josh Rosen Avatar answered Sep 22 '22 08:09

Josh Rosen