I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.
Simplified example. I have three dataframes like below:
df = sc.parallelize([
['a', 'b', 1],
['c', 'd', 3]
]).toDF(('feat1', 'feat2', 'value'))
df_other_1 = sc.parallelize([
['a', 0, 1, 0.0],
['a', 1, 3, 0.1],
['a', 3, 10, 1.0],
['c', 0, 10, 0.2],
['c', 10, 25, 0.5]
]).toDF(('feat1', 'lower', 'upper', 'score'))
df_other_2 = sc.parallelize([
['b', 0, 4, 0.1],
['b', 4, 20, 0.5],
['b', 20, 30, 1.0],
['d', 0, 5, 0.05],
['d', 5, 22, 0.9]
]).toDF(('feat1', 'lower', 'upper', 'score'))
For each row of df
, I want to collect the unique upper values for feat1
and feat2
from df_other_1
and df_other_2
, i.e. for first row, the unique values are (1, 3, 10, 4, 20, 30). Then, I'll sort them like (30, 20, 10, 4, 3, 1) and add to the front, one number above the first one. The df
would become like so:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1]],
['c', 'd', 3, [26, 25, 22, 10, 5]]
]).toDF(('feat1', 'feat2', 'value', 'lst'))
Then, for each row of df
and for each of the respective values of the lst
, I want to calculate the sum of score
from both df_other_1
and df_other_2
where each value of lst
falls within upper
and lower
. My goal is to find the lowest value in each lst
whose total score is above some threshold (e.g. 1.4).
Here's how to calculate the total score. So, for the first row of df
, the first value of lst
is 31. In df_other_1
for feat1
, it is above the highest bucket so it would get a score of 1. Same for df_other_2
. So, the total score would be 1+1 =2. For the value of 10 (again for the first row), the total score would be 1 + 0.5 = 1.5.
This is how the df
would look like in the end:
df = sc.parallelize([
['a', 'b', 1, [31, 30, 20, 10, 4, 3, 1], [2.0, 2.0, 2.0, 1.5, 1.5, 1.1, 0.2], 4],
['c', 'd', 3, [26, 25, 22, 10, 5], [2.0, 1.5, 1.4, 1.4, 1.1], 25]
]).toDF(('feat1', 'feat2', 'value', 'lst', 'total_scores', 'target_value'))
I'm actually looking to find these target values 4
and 25
. The intermediate steps do not really matter.
==========================================================================
Here's what I tried so far:
def get_threshold_for_row(feat1, feat2, threshold):
this_df_other_1 = df_other_1.filter(col('feat1') == feat1)
this_df_other_2 = df_other_2.filter(col('feat1') == feat2)
values_feat_1 = [i[0] for i in this_df_other_1.select('upper').collect()]
values_feat_1.append(values_feat_1[-1] + 1)
values_feat_2 = [i[0] for i in this_df_other_2.select('upper').collect()]
values_feat_2.append(values_feat_2[-1] + 1)
values = values_feat_1 + values_feat_2
values = list(set(values)) #Keep unique values
values.sort(reverse=True) #Sort from largest to smallest
df_1_score = df_2_score = 0
prev_value = 10000 #Any large number
prev_score = 10000
for value in values:
df_1_score = get_score_for_key(this_df_other_1, 'feat_1', feat_1, value)
df_2_score = get_score_for_key(this_df_other_2, 'feat_1', feat_2, value)
total_score = df_1_score + df_2_score
if total_score < threshold and prev_score >= threshold:
return prev_value
prev_score = total_score
prev_value = value
def is_dataframe_empty(df):
return len(df.take(1)) == 0
def get_score_for_key(scores_df, grouping_key, this_id, value):
if is_dataframe_empty(scores_df):
return 0.0
w = Window.partitionBy([grouping_key]).orderBy(col('upper'))
scores_df_tmp = scores_df.withColumn("prev_value", lead(scores_df.upper).over(w))\
.withColumn("is_last", when(col('prev_value').isNull(), 1).otherwise(0))\
.drop('prev_value')
scores_df_tmp = scores_df_tmp.withColumn("next_value", lag(scores_df_tmp.upper).over(w))\
.withColumn("is_first", when(col('next_value').isNull(), 1).otherwise(0))\
.drop('next_value').cache()
grouping_key_score = scores_df_tmp.filter((col(grouping_key) == this_id) &
(((value >= col('from_value')) & (value < col('to_value'))) |
((value >= col('to_value')) & (col('is_last') == 1)) |
((value < col('from_value')) & (col('is_first') == 1)) |
(col('from_value').isNull()))) \
.withColumn('final_score', when(value <= col('to_value'), col('score')).otherwise(1.0)) \
.collect()[0]['final_score']
return grouping_key_score
df.rdd.map(lambda r: (r['feat_1'], r['feat_2'])) \
.map(lambda v: (v[0], v[1], get_threshold_for_row(v[0], v[1], 1.4)))
.toDF()
But I'm getting:
AttributeError: 'Py4JError' object has no attribute 'message'
Sorry for the long post. Any ideas?
A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF). If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.
Caveats of Using Spark UDFs: Spark UDFs are not good but why?? 1)When we use UDFs we end up losing all the optimization Spark does on our Dataframe/Dataset. When we use a UDF, it is as good as a Black box to Spark's optimizer.
I have a dataframe and I want to apply a function to each row. This function depends of other dataframes.
tl;dr That is not possible in UDFs.
In the most broader sense, a UDF is a function (a Catalyst expression actually) that accepts zero or more column values (as Column references).
A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF).
If you want to work on more than one DataFrame in a UDF you have to join
the DataFrames to have the columns you want to use for the UDF.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With