Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apply a function to groupBy data with pyspark

I'm trying to get word counts from a csv when grouping on another column. My csv has three columns: id, message and user_id. I read this in and then split the message and store a list of unigrams:

+-----------------+--------------------+--------------------+
|               id|             message|             user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+

Next, given my dataframe df, I want to group by user_id and then get counts for each of the unigrams. As a simple first pass I tried grouping by user_id and get the length of the grouped message field:

from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf

df = self.session.read.csv(self.corptable, header=True,
        mode="DROPMALFORMED",)

# split my messages ....
# message is now ArrayType(StringType())

grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())

I get the following error:

pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"

Not sure how to get around this error. In general how does one apply a function to one column when grouping another? Do I always have to create a User Defined Function? Very new to Spark.

Edit: Here is how I solved this, given a tokenizer in a separate Python file:

group_field = "user_id"
message_field = "message"

context = SparkContext()
session = SparkSession\
        .builder\
        .appName("dlastk")\
        .getOrCreate()

# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))

df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]

# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))

# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)

# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])

# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")

Data looks like:

# after read
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...|   T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+

# after tokenize
+--------------------+--------------------+
|             user_id|             message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+

# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...

# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....

# after flat RDD to DF
+--------------------+---------+---+--------------------+
|                  _1|       _2| _3|                  _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...|     face|  3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...|      was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...|      how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+
like image 908
Sal Avatar asked Dec 05 '16 20:12

Sal


People also ask

What is groupBy function in PySpark?

Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.

How do I apply a function in PySpark?

The syntax for Pyspark Apply Function to Column The Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.

How do you use the groupBy function?

You call . groupby() and pass the name of the column that you want to group on, which is "state" . Then, you use ["last_name"] to specify the columns on which you want to perform the actual aggregation. You can pass a lot more than just a single column name to .

How do you use groupBy and AGG in PySpark?

Method 1: Using groupBy() Method In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. Here the aggregate function is sum(). sum(): This will return the total values for each group.


1 Answers

Try:

from  pyspark.sql.functions import *

df.withColumn("word", explode("message")) \
  .groupBy("user_id", "word").count() \
  .groupBy("user_id") \
  .agg(collect_list(struct("word", "count")))
like image 109
3 revs, 2 users 92%user6022341 Avatar answered Oct 10 '22 03:10

3 revs, 2 users 92%user6022341