Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dividing two columns of a different DataFrames

I am using Spark to do exploratory data analysis on a user log file. One of the analysis that I am doing is average requests on daily basis per host. So in order to figure out the average, I need to divide the total request column of the DataFrame by number unique Request column of the DataFrame.

total_req_per_day_df = logs_df.select('host',dayofmonth('time').alias('day')).groupby('day').count()

avg_daily_req_per_host_df = total_req_per_day_df.select("day",(total_req_per_day_df["count"] / daily_hosts_df["count"]).alias("count"))

This is what I have written using the PySpark to determine the average. And here is the log of error that I get

AnalysisException: u'resolved attribute(s) count#1993L missing from day#3628,count#3629L in operator !Project [day#3628,(cast(count#3629L as double) / cast(count#1993L as double)) AS count#3630];

Note: daily_hosts_df and logs_df is cached in the memory. How do you divide the count column of both data frames?

like image 992
StackPointer Avatar asked Dec 01 '22 12:12

StackPointer


2 Answers

It is not possible to reference column from another table. If you want to combine data you'll have to join first using something similar to this:

from pyspark.sql.functions import col

(total_req_per_day_df.alias("total")
    .join(daily_hosts_df.alias("host"), ["day"])
    .select(col("day"), (col("total.count") / col("host.count")).alias("count")))
like image 69
zero323 Avatar answered Dec 05 '22 09:12

zero323


It's a question from an edX Spark course assignment. Since the solution is public now I take the opportunity to share another, slower one and ask whether the performance of it could be improved or is totally anti-Spark?

daily_hosts_list = (daily_hosts_df.map(lambda r: (r[0], r[1])).take(30))
days_with_hosts, hosts = zip(*daily_hosts_list)
requests = (total_req_per_day_df.map(lambda r: (r[1])).take(30))
average_requests = [(days_with_hosts[n], float(l)) for n, l in enumerate(list(np.array(requests, dtype=float) / np.array(hosts)))]
avg_daily_req_per_host_df = sqlContext.createDataFrame(average_requests, ('day', 'avg_reqs_per_host_per_day'))
like image 26
marcindulak Avatar answered Dec 05 '22 11:12

marcindulak