Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combine PySpark DataFrame ArrayType fields into single ArrayType field

Tags:

I have a PySpark DataFrame with 2 ArrayType fields:

>>>df DataFrame[id: string, tokens: array<string>, bigrams: array<string>] >>>df.take(1) [Row(id='ID1', tokens=['one', 'two', 'two'], bigrams=['one two', 'two two'])] 

I would like to combine them into a single ArrayType field:

>>>df2 DataFrame[id: string, tokens_bigrams: array<string>] >>>df2.take(1) [Row(id='ID1', tokens_bigrams=['one', 'two', 'two', 'one two', 'two two'])] 

The syntax that works with strings does not seem to work here:

df2 = df.withColumn('tokens_bigrams', df.tokens + df.bigrams) 

Thanks!

like image 401
zemekeneng Avatar asked May 17 '16 18:05

zemekeneng


People also ask

How do I concatenate multiple columns in PySpark?

PySpark Concatenate Using concat()concat() function of Pyspark SQL is used to concatenate multiple DataFrame columns into a single column. It can also be used to concatenate column types string, binary, and compatible array columns.

How does PySpark define ArrayType?

Create PySpark ArrayType You can create an instance of an ArrayType using ArraType() class, This takes arguments valueType and one optional argument valueContainsNull to specify if a value can accept null, by default it takes True. valueType should be a PySpark type that extends DataType class.

How do you combine columns in PySpark?

Concatenating columns in pyspark is accomplished using concat() Function. Concatenating two columns is accomplished using concat() Function. Concatenating multiple columns is accomplished using concat() Function. Concatenating columns in pyspark is accomplished using concat() Function.


1 Answers

Spark >= 2.4

You can use concat function (SPARK-23736):

from pyspark.sql.functions import col, concat   df.select(concat(col("tokens"), col("tokens_bigrams"))).show(truncate=False)  # +---------------------------------+                                              # |concat(tokens, tokens_bigrams)   | # +---------------------------------+ # |[one, two, two, one two, two two]| # |null                             | # +---------------------------------+ 

To keep data when one of the values is NULL you can coalesce with array:

from pyspark.sql.functions import array, coalesce        df.select(concat(     coalesce(col("tokens"), array()),     coalesce(col("tokens_bigrams"), array()) )).show(truncate = False)  # +--------------------------------------------------------------------+ # |concat(coalesce(tokens, array()), coalesce(tokens_bigrams, array()))| # +--------------------------------------------------------------------+ # |[one, two, two, one two, two two]                                   | # |[three]                                                             | # +--------------------------------------------------------------------+ 

Spark < 2.4

Unfortunately to concatenate array columns in general case you'll need an UDF, for example like this:

from itertools import chain from pyspark.sql.functions import col, udf from pyspark.sql.types import *   def concat(type):     def concat_(*args):         return list(chain.from_iterable((arg if arg else [] for arg in args)))     return udf(concat_, ArrayType(type)) 

which can be used as:

df = spark.createDataFrame(     [(["one", "two", "two"], ["one two", "two two"]), (["three"], None)],      ("tokens", "tokens_bigrams") )  concat_string_arrays = concat(StringType()) df.select(concat_string_arrays("tokens", "tokens_bigrams")).show(truncate=False)  # +---------------------------------+ # |concat_(tokens, tokens_bigrams)  | # +---------------------------------+ # |[one, two, two, one two, two two]| # |[three]                          | # +---------------------------------+ 
like image 120
zero323 Avatar answered Oct 28 '22 05:10

zero323