So I have a dataframe of usernames what threads they have posted in and the timestamp of those posts. What I am trying to do if figure out who was the first user for a thread and what time it was. I know to figure out the first post is to do a group by on a thread and then a min on the timestamp. But that removes the username. how do I use the group by and keep the usernames?
The short answer is Yes, the hourly counts will maintain the same order. To generalise, it's important that you sort before you group.
1 Answer. Suppose you have a df that includes columns “name” and “age”, and on these two columns you want to perform groupBY. Now, in order to get other columns also after doing a groupBy you can use join function. Now, data_joined will have all columns including the count values.
PySpark Groupby on Multiple Columns can be performed either by using a list with the DataFrame column names you wanted to group or by sending multiple column names as parameters to PySpark groupBy() method.
The Spark function collect_list() is used to aggregate the values into an ArrayType typically after group by and window partition.
You can do this with one groupBy by using a HiveContext and the Hive named_struct function. The trick is min will work on a struct by evaluating the columns in order from left to right, and only moving onto the next one if there the current column is equal. So, in this case, it is really just comparing the timestamp column, but by making a struct that includes the name you will have access to that after the min function spits out the result.
data = [
('user', 'thread', 'ts'),
('ryan', 1, 1234),
('bob', 1, 2345),
('bob', 2, 1234),
('john', 2, 2223)
]
header = data[0]
rdd = sc.parallelize(data[1:])
df = sqlContext.createDataFrame(rdd, header)
df.registerTempTable('table')
sql = """
SELECT thread, min(named_struct('ts', ts, 'user', user)) as earliest
FROM table
GROUP BY thread
"""
grouped = sqlContext.sql(sql)
final = grouped.selectExpr('thread', 'earliest.user as user', 'earliest.ts as timestamp')
This can be done using the row_number() window function, this will keep all other columns intact. Use withColumn to create a new column something like "thread_user_order" and its value should be row_number() PARTITION BY thread ORDER BY ts. Then filter "thread_user_order" == 1.
Here is some pseudo code:
df.withColumn("thread_user_order", row_number().over(Window.partitionBy(col("thread")).orderBy(col("ts")))).where(col("thread_user_order").equalTo(1))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With