I'm trying to get word counts from a csv when grouping on another column. My csv has three columns: id, message and user_id. I read this in and then split the message and store a list of unigrams:
+-----------------+--------------------+--------------------+
| id| message| user_id|
+-----------------+--------------------+--------------------+
|10100720363468236|[i'm, sad, to, mi...|dceafb541a1b8e894...|
|10100718944611636|[what, does, the,...|dceafb541a1b8e894...|
|10100718890699676|[at, the, oecd, w...|dceafb541a1b8e894...|
+-----------------+--------------------+--------------------+
Next, given my dataframe df
, I want to group by user_id
and then get counts for each of the unigrams. As a simple first pass I tried grouping by user_id
and get the length of the grouped message field:
from collections import Counter
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import udf
df = self.session.read.csv(self.corptable, header=True,
mode="DROPMALFORMED",)
# split my messages ....
# message is now ArrayType(StringType())
grouped = df.groupBy(df["user_id"])
counter = udf(lambda l: len(l), ArrayType(StringType()))
grouped.agg(counter(df["message"]))
print(grouped.collect())
I get the following error:
pyspark.sql.utils.AnalysisException: "expression '`message`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;"
Not sure how to get around this error. In general how does one apply a function to one column when grouping another? Do I always have to create a User Defined Function? Very new to Spark.
Edit: Here is how I solved this, given a tokenizer in a separate Python file:
group_field = "user_id"
message_field = "message"
context = SparkContext()
session = SparkSession\
.builder\
.appName("dlastk")\
.getOrCreate()
# add tokenizer
context.addPyFile(tokenizer_path)
from tokenizer import Tokenizer
tokenizer = Tokenizer()
spark_tokenizer = udf(tokenizer.tokenize, ArrayType(StringType()))
df = session.read.csv("myFile.csv", header=True,)
df = df[group_field, message_field]
# tokenize the message field
df = df.withColumn(message_field, spark_tokenizer(df[message_field]))
# create ngrams from tokenized messages
n = 1
grouped = df.rdd.map(lambda row: (row[0], Counter([" ".join(x) for x in zip(*[row[1][i:] for i in range(n)])]))).reduceByKey(add)
# flatten the rdd so that each row contains (group_id, ngram, count, relative frequency
flat = grouped.flatMap(lambda row: [[row[0], x,y, y/sum(row[1].values())] for x,y in row[1].items()])
# rdd -> DF
flat = flat.toDF()
flat.write.csv("myNewCSV.csv")
Data looks like:
# after read
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|To the douchebag ...|
|00035fb0dcfbeaa8b...| T minus 1 week...|
|00035fb0dcfbeaa8b...|Last full day of ...|
+--------------------+--------------------+
# after tokenize
+--------------------+--------------------+
| user_id| message|
+--------------------+--------------------+
|00035fb0dcfbeaa8b...|[to, the, doucheb...|
|00035fb0dcfbeaa8b...|[t, minus, 1, wee...|
|00035fb0dcfbeaa8b...|[last, full, day,...|
+--------------------+--------------------+
# grouped: after 1grams extracted and Counters added
[('00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', Counter({'!': 545, '.': 373, 'the': 306, '"': 225, ...
# flat: after calculating sum and relative frequency for each 1gram
[['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'face', 3, 0.000320547066994337], ['00035fb0dcfbeaa8bb70ffe24d614d4dcee446b803eb4063dccf14dd2a474611', 'was', 26, 0.002778074580617587] ....
# after flat RDD to DF
+--------------------+---------+---+--------------------+
| _1| _2| _3| _4|
+--------------------+---------+---+--------------------+
|00035fb0dcfbeaa8b...| face| 3| 3.20547066994337E-4|
|00035fb0dcfbeaa8b...| was| 26|0.002778074580617587|
|00035fb0dcfbeaa8b...| how| 22|0.002350678491291...|
+--------------------+---------+---+--------------------+
Similar to SQL GROUP BY clause, PySpark groupBy() function is used to collect the identical data into groups on DataFrame and perform count, sum, avg, min, max functions on the grouped data.
The syntax for Pyspark Apply Function to Column The Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.
You call . groupby() and pass the name of the column that you want to group on, which is "state" . Then, you use ["last_name"] to specify the columns on which you want to perform the actual aggregation. You can pass a lot more than just a single column name to .
Method 1: Using groupBy() Method In PySpark, groupBy() is used to collect the identical data into groups on the PySpark DataFrame and perform aggregate functions on the grouped data. Here the aggregate function is sum(). sum(): This will return the total values for each group.
Try:
from pyspark.sql.functions import *
df.withColumn("word", explode("message")) \
.groupBy("user_id", "word").count() \
.groupBy("user_id") \
.agg(collect_list(struct("word", "count")))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With