Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Store aggregate value of a PySpark dataframe column into a variable

I am working with PySpark dataframes here. "test1" is my PySpark dataframe and event_date is a TimestampType. So when I try to get a distinct count of event_date, the result is a integer variable but when I try to get max of the same column the result is a dataframe. I would like to understand what operations result in a dataframe and variable. I would also like to know how to store the max of the event date as a variable

Code that results in an integer type:

loop_cnt=test1.select('event_date').distinct().count()
type(loop_cnt)

Code that results in dataframe type:

last_processed_dt=test1.select([max('event_date')])
type(last_processed_dt)

Edited to add a reproducible example:

schema = StructType([StructField("event_date", TimestampType(), True)])

df = sqlContext.createDataFrame([(datetime(2015, 8, 10, 2, 44, 15),),(datetime(2015, 8, 10, 3, 44, 15),)], schema)

Code that returns a dataframe:

last_processed_dt=df.select([max('event_date')])
type(last_processed_dt)

Code that returns a varible:

loop_cnt=df.select('event_date').distinct().count()
type(loop_cnt) 
like image 210
Sid Avatar asked May 02 '16 16:05

Sid


People also ask

How do you sum the values of a column in PySpark?

By using the sum() method, we can get the total value from the column, and finally, we can use the collect() method to get the sum from the column. Where, df is the input PySpark DataFrame. column_name is the column to get the sum value.

How do you use the AGG function in PySpark DataFrame?

agg function takes up the column name and the aggregate function to be used. Let's check this with examples. The SUM function sums up the grouped data based on column value. The MAX function checks out the maximum value of the function based on the column name provided.

How do you assign a value to a DataFrame column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.

How do I copy values from one column to another in PySpark?

By using PySpark SQL function regexp_replace() you can replace a column value with a string for another string/substring.


2 Answers

You cannot directly access the values in a dataframe. Dataframe returns a Row Object. Instead Dataframe gives you a option to convert it into a python dictionary. Go through the following example where I will calculate average wordcount:

wordsDF = sqlContext.createDataFrame([('cat',), ('elephant',), ('rat',), ('rat',), ('cat', )], ['word'])
wordCountsDF = wordsDF.groupBy(wordsDF['word']).count()
wordCountsDF.show()

Here are the word count results:

+--------+-----+
|    word|count|
+--------+-----+
|     cat|    2|
|     rat|    2|
|elephant|    1|
+--------+-----+

Now I calculate the average of count column apply collect() operation on it. Remember collect() returns a list.Here the list contains one element only.

averageCount = wordCountsDF.groupBy().avg('count').collect()

Result looks something like this.

[Row(avg(count)=1.6666666666666667)]

You cannot access directly the average value using some python variable. You have to convert it into a dictionary to access it.

results={}
for i in averageCount:
  results.update(i.asDict())
print results

Our final results look like these:

{'avg(count)': 1.6666666666666667}

Finally you can access average value using:

print results['avg(count)']

1.66666666667
like image 51
sujit Avatar answered Sep 30 '22 16:09

sujit


I'm pretty sure df.select([max('event_date')]) returns a DataFrame because there could be more than one row that has the max value in that column. In your particular use case no two rows may have the same value in that column, but it is easy to imagine a case where more than one row can have the same max event_date.

df.select('event_date').distinct().count() returns an integer because it is telling you how many distinct values there are in that particular column. It does NOT tell you which value is the largest.

If you want code to get the max event_date and store it as a variable, try the following max_date = df.select([max('event_date')]).distinct().collect()

like image 45
Katya Willard Avatar answered Sep 30 '22 16:09

Katya Willard