Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Passing a data frame column and external list to udf under withColumn

Tags:

I have a Spark dataframe with the following structure. The bodyText_token has the tokens (processed/set of words). And I have a nested list of defined keywords

root
 |-- id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- bodyText_token: array (nullable = true)

keyword_list=[['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]

I needed to check how many tokens fall under each keyword list and add the result as a new column of the existing dataframe. Eg: if tokens =["become", "farmer","rally","workers","student"] the result will be -> [1,2,0]

The following function worked as expected.

def label_maker_topic(tokens,topic_words):
    twt_list = []
    for i in range(0, len(topic_words)):
        count = 0
        #print(topic_words[i])
        for tkn in tokens:
            if tkn in topic_words[i]:
                count += 1
        twt_list.append(count)
    
    return twt_list

I used udf under withColumn to access the function and I get an error. I think it's about passing an external list to a udf. Is there a way I can pass the external list and the dataframe column to a udf and add a new column to my dataframe?

topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))
like image 776
Jay Avatar asked May 24 '16 09:05

Jay


People also ask

Can we pass DataFrame to UDF?

A UDF can only work on records that could in the most broader case be an entire DataFrame if the UDF is a user-defined aggregate function (UDAF). If you want to work on more than one DataFrame in a UDF you have to join the DataFrames to have the columns you want to use for the UDF.

How do you pass a DataFrame column to a function in Pyspark?

PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. In order to change the value, pass an existing column name as a first argument and a value to be assigned as a second argument to the withColumn() function.

Can a UDF return multiple columns?

UDF can return only a single column at the time.

What does withColumn do in spark?

In Spark SQL, the withColumn() function is the most popular one, which is used to derive a column from multiple columns, change the current value of a column, convert the datatype of an existing column, create a new column, and many more.


1 Answers

The cleanest solution is to pass additional arguments using closure:

def make_topic_word(topic_words):
     return udf(lambda c: label_maker_topic(c, topic_words))

df = sc.parallelize([(["union"], )]).toDF(["tokens"])

(df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
    .show())

This doesn't require any changes in keyword_list or the function you wrap with UDF. You can also use this method to pass an arbitrary object. This can be used to pass for example a list of sets for efficient lookups.

If you want to use your current UDF and pass topic_words directly you'll have to convert it to a column literal first:

from pyspark.sql.functions import array, lit

ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()

Depending on your data and requirements there can alternative, more efficient solutions, which don't require UDFs (explode + aggregate + collapse) or lookups (hashing + vector operations).

like image 181
zero323 Avatar answered Nov 03 '22 02:11

zero323