Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: Difference between collect(), take() and show() outputs after conversion toDF

I am using Spark 1.5.

I have a column of 30 ids which I am loading as integers from a database:

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))

This is the output of numsRDD:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608

Right next, I want to produce all combinations of 3 for those ids then save each combination as a tuple of the form: < tripletID: String, triplet: Array(Int)> and convert it into a dataframe, which I do as follows:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","), 
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

As soon as I do that I try to print some of the combinationsDF's contents just to make sure that everything is the way it should be. So I try this:

combinationsDF.show

which returns:

+--------------------+--------------------+
|           tripletID|             triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows

As it is evident, the first element of every tripletID is missing. So, just to be 100% sure I use take(20) as follows:

combinationsDF.take(20).foreach(println(_))

which returns a more detailed representation as per below:

[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]

So now I am sure that the first id from tripletID is somehow for whatever reason deprecated. But still, if I try to use collect instead of take(20):

combinationsDF.collect.foreach(println(_))

everything goes back to being fine again (!!!):

[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...

1. I have exhaustively queried the steps just before I parallelize the array of combinations into an RDD and everything is ok. 2. I have also printed the output right after parallelize is applied and again everything is ok. 3. The problem appears to be related with the conversion of the numsRDD to a DF and despite my best efforts I cannot deal with it. 4. I was also incapable of reproducing the problem with mock data using the same code snippet.

So first: What's causing this problem? and second: How do I fix it?

like image 383
Christos Hadjinikolis Avatar asked Dec 06 '16 16:12

Christos Hadjinikolis


People also ask

What is the difference between collect and show in spark?

show() : It will show only the content of the dataframe. df. collect() : It will show the content and metadata of the dataframe.

What is difference between toDF and createDataFrame?

createDataFrame() and toDF() methods are two different way's to create DataFrame in spark. By using toDF() method, we don't have the control over schema customization whereas in createDataFrame() method we have complete control over the schema customization. Use toDF() method only for local testing.

What does collect function do in spark?

collect() action function is used to retrieve all elements from the dataset (RDD/DataFrame/Dataset) as a Array[Row] to the driver program.

What does collect () do in PySpark?

PySpark Collect() – Retrieve data from DataFrame. Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.


2 Answers

  1. df.show() shows only content.

e.g.

df.show()
Out[11]: 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
  1. df.collect() shows content and structure/metadata.e.g.

    df.collect()
    Out[11]:
    [Row(age=None, name=u'Michael'),
    Row(age=30, name=u'Andy'),
    Row(age=19, name=u'Justin')]
    
    1. df.take(some number) can be used to shows content and structure/metadata for a limited number of rows for a very large dataset. note it flattens out the data and shows in a single row.

e.g. to see only first two rows of the dataframe

df.take(2)
Out[13]: 
[Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')]
like image 85
Ratnesh Sharma Avatar answered Oct 22 '22 14:10

Ratnesh Sharma


I would check your original numsRDD, it looks like you might have an empty string or null value in there. This works for me:

scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","),
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

// Exiting paste mode, now interpreting.

combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

The only other thing I can think of is mkString not working like you would expect. Try out this string interpolation (also no need to recreate the List):

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map{case List(a,b,c) => (
        s"$a,$b,$c", 
        Array(a,b,c))}
  .toDF("tripletID","triplet")

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
like image 35
evan.oman Avatar answered Oct 22 '22 15:10

evan.oman