Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Applying Mapping Function on DataFrame

I have just started using databricks/pyspark. Im using python/spark 2.1. I have uploaded data to a table. This table is a single column full of strings. I wish to apply a mapping function to each element in the column. I load the table into a dataframe:

df = spark.table("mynewtable")

The only way I could see was others saying was to convert it to RDD to apply the mapping function and then back to dataframe to show the data. But this throws up job aborted stage failure:

df2 = df.select("_c0").rdd.flatMap(lambda x: x.append("anything")).toDF()

All i want to do is just apply any sort of map function to my data in the table. For example append something to each string in the column, or perform a split on a char, and then put that back into a dataframe so i can .show() or display it.

like image 777
yahalom Avatar asked Jul 30 '17 20:07

yahalom


People also ask

Can we use map function on DataFrame?

pandas map() function from Series is used to substitute each value in a Series with another value, that may be derived from a function, a dict or a Series . Since DataFrame columns are series, you can use map() to update the column and assign it back to the DataFrame.

How do you apply a map to a DataFrame in Python?

DataFrame - applymap() functionThe applymap() function is used to apply a function to a Dataframe elementwise. This method applies a function that accepts and returns a scalar to every element of a DataFrame. Python function, returns a single value from a single value. Transformed DataFrame.

How do I apply a map to a DataFrame column?

apply() is used to apply a function along an axis of the DataFrame or on values of Series. applymap() is used to apply a function to a DataFrame elementwise. map() is used to substitute each value in a Series with another value.


1 Answers

You cannot:

  • Use flatMap because it will flatten the Row
  • You cannot use append because:

    • tuple or Row have no append method
    • append (if present on collection) is executed for side effects and returns None

I would use withColumn:

df.withColumn("foo", lit("anything"))

but map should work as well:

df.select("_c0").rdd.flatMap(lambda x: x + ("anything", )).toDF()

Edit (given the comment):

You probably want an udf

from pyspark.sql.functions import udf

def iplookup(s):
    return ... # Some lookup logic

iplookup_udf = udf(iplookup)

df.withColumn("foo", iplookup_udf("c0"))

Default return type is StringType, so if you want something else you should adjust it.

like image 162
Alper t. Turker Avatar answered Oct 19 '22 21:10

Alper t. Turker