Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: How to translate count(distinct(value)) in Dataframe API's

I'm trying to compare different ways to aggregate my data.

This is my input data with 2 elements (page,visitor):

(PAG1,V1) (PAG1,V1) (PAG2,V1) (PAG2,V2) (PAG2,V1) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG2,V2) (PAG1,V3) 

Working with a SQL command into Spark SQL with this code:

import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Log(p._1,p._2)).toDF() logs.registerTempTable("logs") val sqlResult= sqlContext.sql(                               """select page                                        ,count(distinct visitor) as visitor                                    from logs                                group by page                               """) val result = sqlResult.map(x=>(x(0).toString,x(1).toString)) result.foreach(println) 

I get this output:

(PAG1,3) // PAG1 has been visited by 3 different visitors (PAG2,2) // PAG2 has been visited by 2 different visitors 

Now, I would like to get the same result using Dataframes and thiers API, but I can't get the same output:

import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Coppia(p._1,p._2)).toDF() val result = log.select("page","visitor").groupBy("page").count().distinct result.foreach(println) 

In fact, that's what I get as output:

[PAG1,8]  // just the simple page count for every page [PAG2,4] 
like image 214
Fabio Fantoni Avatar asked May 13 '15 14:05

Fabio Fantoni


People also ask

How do I count distinct values in spark DataFrame?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.

How do I use count in spark DataFrame?

For counting the number of distinct rows we are using distinct(). count() function which extracts the number of distinct rows from the Dataframe and storing it in the variable named as 'row' For counting the number of columns we are using df.

How do I get distinct values in Pyspark?

Distinct value of the column in pyspark is obtained by using select() function along with distinct() function. select() function takes up mutiple column names as argument, Followed by distinct() function will give distinct value of those columns combined.

What is selectExpr in Pyspark?

selectExpr() pyspark. sql. DataFrame. selectExpr() is similar to select() with the only difference being that it accepts SQL expressions (in string format) that will be executed. Again, this expression will return a new DataFrame out of the original based on the input provided.


Video Answer


1 Answers

What you need is the DataFrame aggregation function countDistinct:

import sqlContext.implicits._ import org.apache.spark.sql.functions._  case class Log(page: String, visitor: String)  val logs = data.map(p => Log(p._1,p._2))             .toDF()  val result = logs.select("page","visitor")             .groupBy('page)             .agg('page, countDistinct('visitor))  result.foreach(println) 
like image 179
yjshen Avatar answered Sep 30 '22 08:09

yjshen