I'm trying to compare different ways to aggregate my data.
This is my input data with 2 elements (page,visitor):
(PAG1,V1) (PAG1,V1) (PAG2,V1) (PAG2,V2) (PAG2,V1) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG1,V2) (PAG1,V1) (PAG2,V2) (PAG1,V3)
Working with a SQL command into Spark SQL with this code:
import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Log(p._1,p._2)).toDF() logs.registerTempTable("logs") val sqlResult= sqlContext.sql( """select page ,count(distinct visitor) as visitor from logs group by page """) val result = sqlResult.map(x=>(x(0).toString,x(1).toString)) result.foreach(println)
I get this output:
(PAG1,3) // PAG1 has been visited by 3 different visitors (PAG2,2) // PAG2 has been visited by 2 different visitors
Now, I would like to get the same result using Dataframes and thiers API, but I can't get the same output:
import sqlContext.implicits._ case class Log(page: String, visitor: String) val logs = data.map(p => Coppia(p._1,p._2)).toDF() val result = log.select("page","visitor").groupBy("page").count().distinct result.foreach(println)
In fact, that's what I get as output:
[PAG1,8] // just the simple page count for every page [PAG2,4]
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
For counting the number of distinct rows we are using distinct(). count() function which extracts the number of distinct rows from the Dataframe and storing it in the variable named as 'row' For counting the number of columns we are using df.
Distinct value of the column in pyspark is obtained by using select() function along with distinct() function. select() function takes up mutiple column names as argument, Followed by distinct() function will give distinct value of those columns combined.
selectExpr() pyspark. sql. DataFrame. selectExpr() is similar to select() with the only difference being that it accepts SQL expressions (in string format) that will be executed. Again, this expression will return a new DataFrame out of the original based on the input provided.
What you need is the DataFrame aggregation function countDistinct
:
import sqlContext.implicits._ import org.apache.spark.sql.functions._ case class Log(page: String, visitor: String) val logs = data.map(p => Log(p._1,p._2)) .toDF() val result = logs.select("page","visitor") .groupBy('page) .agg('page, countDistinct('visitor)) result.foreach(println)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With