Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to apply a function to a column of a Spark DataFrame?

Let's assume that we have a Spark DataFrame

df.getClass
Class[_ <: org.apache.spark.sql.DataFrame] = class org.apache.spark.sql.DataFrame

with the following schema

df.printSchema
root
|-- rawFV: string (nullable = true)
|-- tk: array (nullable = true)
|    |-- element: string (containsNull = true)

Given that each row of the tk column is an array of strings, how to write a Scala function that will return the number of elements in each row?

like image 446
ranlot Avatar asked Jan 05 '16 14:01

ranlot


People also ask

How do you apply a function to a column in Spark DataFrame in Python?

The syntax for Pyspark Apply Function to ColumnThe Import is to be used for passing the user-defined function. B:- The Data frame model used and the user-defined function that is to be passed for the column name. It takes up the column name as the parameter, and the function can be passed along.

How do you pass a column to a function in PySpark?

PySpark withColumn() function of DataFrame can also be used to change the value of an existing column. In order to change the value, pass an existing column name as a first argument and a value to be assigned as a second argument to the withColumn() function. Note that the second argument should be Column type .

How do I apply a function to multiple columns in PySpark?

You can use reduce , for loops, or list comprehensions to apply PySpark functions to multiple columns in a DataFrame. Using iterators to apply the same operation on multiple columns is vital for maintaining a DRY codebase.

How do you assign a value to a column in PySpark?

You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.


2 Answers

You don't have to write a custom function because there is one:

import org.apache.spark.sql.functions.size

df.select(size($"tk"))

If you really want you can write an udf:

import org.apache.spark.sql.functions.udf

val size_ = udf((xs: Seq[String]) => xs.size)

or even create custom a expression but there is really no point in that.

like image 56
2 revs Avatar answered Oct 07 '22 16:10

2 revs


One way is to access them using the sql like below.

df.registerTempTable("tab1")
val df2 = sqlContext.sql("select tk[0], tk[1] from tab1")

df2.show()

To get size of array column,

val df3 = sqlContext.sql("select size(tk) from tab1")
df3.show()

If your Spark version is older, you can use HiveContext instead of Spark's SQL Context.

I would also try for some thing that traverses.

like image 45
Srini Avatar answered Oct 07 '22 17:10

Srini