Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Aggregation with Group By date in Spark SQL

I have an RDD containing a timestamp named time of type long:

root
 |-- id: string (nullable = true)
 |-- value1: string (nullable = true)
 |-- value2: string (nullable = true)
 |-- time: long (nullable = true)
 |-- type: string (nullable = true)

I am trying to group by value1, value2 and time as YYYY-MM-DD. I tried to group by cast(time as Date) but then I got the following error:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.RuntimeException: [1.21] failure: ``DECIMAL'' expected but identifier Date found

Does that mean there is not way to group by a date? I even tried to add another level of casting to have it as a String:

cast(cast(time as Date) as String)

Which returns the same error.

I've read that I could use probably aggregateByKey on the RDD but I don't understand how to use it for a few columns and convert that long to a YYYY-MM-DD String. How should I proceed?

like image 292
galex Avatar asked May 05 '15 12:05

galex


People also ask

How do you do aggregation in Spark?

You need to define a key or grouping in aggregation. You can also define an aggregation function that specifies how the transformations will be performed among the columns. If you give multiple values as input, the aggregation function will generate one result for each group.

What is AGG in Spark SQL?

agg. (Java-specific) Compute aggregates by specifying a map from column name to aggregate methods. The resulting DataFrame will also contain the grouping columns. The available aggregate methods are avg , max , min , sum , count .

Can we use groupBy without aggregate function in Spark?

GROUP BY without Aggregate Functions Although most of the times GROUP BY is used along with aggregate functions, it can still still used without aggregate functions — to find unique records.

How does groupBy work in Spark?

Similar to SQL “GROUP BY” clause, Spark sql groupBy() function is used to collect the identical data into groups on DataFrame/Dataset and perform aggregate functions like count(),min(),max,avg(),mean() on the grouped data.


3 Answers

I solved the issue by adding this function:

def convert( time:Long ) : String = {
  val sdf = new java.text.SimpleDateFormat("yyyy-MM-dd")
  return sdf.format(new java.util.Date(time))
}

And registering it into the sqlContext like this:

sqlContext.registerFunction("convert", convert _)

Then I could finally group by date:

select * from table convert(time)
like image 184
galex Avatar answered Sep 18 '22 18:09

galex


I'm using Spark 1.4.0 and since 1.2.0 DATE appears to be present in the Spark SQL API (SPARK-2562). DATE should allow you to group by the time as YYYY-MM-DD.

I also have a similar data structure, where my created_on is analogous to your time field.

root
|-- id: long (nullable = true)
|-- value1: long (nullable = true)
|-- created_on: long (nullable = true)

I solved it using FROM_UNIXTIME(created_on,'YYYY-MM-dd') and works well:

val countQuery = "SELECT FROM_UNIXTIME(created_on,'YYYY-MM-dd') as `date_created`, COUNT(*) AS `count` FROM user GROUP BY FROM_UNIXTIME(created_on,'YYYY-MM-dd')"

From here on you can do the normal operations, execute the query into a dataframe and so on.

FROM_UNIXTIME worked probably because I have Hive included in my Spark installation and it's a Hive UDF. However it will be included as part of the Spark SQL native syntax in future releases (SPARK-8175).

like image 33
josemrivera Avatar answered Sep 22 '22 18:09

josemrivera


Not sure if this is what you meant/needed but I've felt the same struggle-ness dealing with date/timestamp in spark-sql and the only thing I came up with was casting string in timestamp since it seems impossible (to me) having Date type in spark-sql.

Anyway, this is my code to accomplish something similar (Long in place of String) to your need (maybe):

  val mySQL = sqlContext.sql("select cast(yourLong as timestamp) as time_cast" +
"                                    ,count(1) total "+
"                               from logs" +
"                              group by cast(yourLong as timestamp)" 
)
val result= mySQL.map(x=>(x(0).toString,x(1).toString))

and the output is something like this:

(2009-12-18 10:09:28.0,7)
(2009-12-18 05:55:14.0,1)
(2009-12-18 16:02:50.0,2)
(2009-12-18 09:32:32.0,2)

Could this be useful for you as well even though I'm using timestamp and not Date?

Hope it could help

FF

EDIT: in order to test a "single-cast" from Long to Timestamp I've tried this simple change:

      val mySQL = sqlContext.sql("select cast(1430838439 as timestamp) as time_cast" +
"                                    ,count(1) total "+
"                               from logs" +
"                              group by cast(1430838439 as timestamp)" 
)
val result= mySQL.map(x=>(x(0),x(1)))

and all worked fine with the result:

(1970-01-17 14:27:18.439,4)  // 4 because I have 4 rows in my table
like image 32
Fabio Fantoni Avatar answered Sep 18 '22 18:09

Fabio Fantoni