Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Keeping a unused column in Spark when using group by?

Tags:

So I have a dataframe of usernames what threads they have posted in and the timestamp of those posts. What I am trying to do if figure out who was the first user for a thread and what time it was. I know to figure out the first post is to do a group by on a thread and then a min on the timestamp. But that removes the username. how do I use the group by and keep the usernames?

like image 711
atdy17 Avatar asked Oct 26 '16 20:10

atdy17


People also ask

Does groupBy preserve order PySpark?

The short answer is Yes, the hourly counts will maintain the same order. To generalise, it's important that you sort before you group.

How do I get other columns with Spark DataFrame groupBy?

1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values.

How do you get other columns in group by PySpark?

PySpark Groupby on Multiple Columns can be performed either by using a list with the DataFrame column names you wanted to group or by sending multiple column names as parameters to PySpark groupBy() method.

What does collect_list do in Spark?

The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.


2 Answers

You can do this with one groupBy by using a HiveContext and the Hive named_struct function. The trick is min will work on a struct by evaluating the columns in order from left to right, and only moving onto the next one if there the current column is equal. So, in this case, it is really just comparing the timestamp column, but by making a struct that includes the name you will have access to that after the min function spits out the result.

data = [
    ('user', 'thread', 'ts'),
    ('ryan', 1, 1234),
    ('bob', 1, 2345),
    ('bob', 2, 1234),
    ('john', 2, 2223)
]

header = data[0]
rdd = sc.parallelize(data[1:])
df = sqlContext.createDataFrame(rdd, header)
df.registerTempTable('table')

sql = """
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest
FROM table
GROUP BY thread
"""

grouped = sqlContext.sql(sql)
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp')
like image 79
Ryan Widmaier Avatar answered Sep 22 '22 16:09

Ryan Widmaier


This can be done using the row_number() window function, this will keep all other columns intact. Use withColumn to create a new column something like "thread_user_order" and its value should be row_number() PARTITION BY thread ORDER BY ts. Then filter "thread_user_order" == 1.

Here is some pseudo code:

df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1))
like image 39
Sergey Bahchissaraitsev Avatar answered Sep 24 '22 16:09

Sergey Bahchissaraitsev