Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to zip two array columns in Spark SQL

I have a Pandas dataframe. I have tried to join two columns containing string values into a list first and then using zip, I joined each element of the list with '_'. My data set is like below:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

I wanted to join these two columns in a third column like below for each row of my dataframe.

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

I have successfully done so in python using the code below but the dataframe is quite large and it takes a very long time to run it for the whole dataframe. I want to do the same thing in PySpark for efficiency. I have read the data in spark dataframe successfully but I'm having a hard time determining how to replicate Pandas functions with PySpark equivalent functions. How can I get my desired result in PySpark?

df['column_3'] = df['column_2']
for index, row in df.iterrows():
  while index < 3:
    if isinstance(row['column_1'], str):      
      row['column_1'] = list(row['column_1'].split(','))
      row['column_2'] = list(row['column_2'].split(','))
      row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

I have converted the two columns to arrays in PySpark by using the below code

from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
    split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
    split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

Now all I need is to zip each element of the arrays in the two columns using '_'. How can I use zip with this? Any help is appreciated.

like image 637
Falconic Avatar asked Jan 21 '19 02:01

Falconic


People also ask

How do you zip two columns in Pyspark?

We iterate over the items in column_names and column_values to create a list of the pairs, and then use list(chain. from_iterable(...)) to flatten the list. After the list is made, you can select the field by name. This assumes that there will always be at least 2 elements in each array.

How do I combine two columns in spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I select multiple columns in spark data frame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.

How to create array or map columns to rows in spark?

Spark function explode (e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements. When a map is passed, it creates two new columns one for key and one for value and each element in map split into the row.

How do you explode an array in spark?

explode – spark explode array or map column to rows Spark function explode (e: Column) is used to explode or create array or map columns to rows. When an array is passed to this function, it creates a new default column “col1” and it contains all array elements.

What is array function in Apache Spark?

Apache Spark / Spark SQL Functions Spark SQL provides built-in standard array functions defines in DataFrame API, these come in handy when we need to make operations on array (ArrayType) column. All these accept input as, array column and several other arguments based on the function.

How do I merge two arrays in spark?

The higher-order function takes 2 arrays to merge, element-wise, using a lambda function (x, y) -> concat (x, '_', y). Show activity on this post. For Spark 3.1+, they now provide pyspark.sql.functions.zip_with (), therefore it can be done like this: Thanks for contributing an answer to Stack Overflow! Please be sure to answer the question.


Video Answer


2 Answers

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

So if you already have two arrays:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

You can just apply it on the result

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

Now to combine the results you can transform (How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

Note:

Higher order functions transform and arrays_zip has been introduced in Apache Spark 2.4.

like image 64
10465355 Avatar answered Sep 21 '22 14:09

10465355


You can also UDF to zip the split array columns,

df = spark.createDataFrame([('abc,def,ghi','1.0,2.0,3.0')], ['col1','col2']) 
+-----------+-----------+
|col1       |col2       |
+-----------+-----------+
|abc,def,ghi|1.0,2.0,3.0|
+-----------+-----------+ ## Hope this is how your dataframe is

from pyspark.sql import functions as F
from pyspark.sql.types import *

def concat_udf(*args):
    return ['_'.join(x) for x in zip(*args)]

udf1 = F.udf(concat_udf,ArrayType(StringType()))
df = df.withColumn('col3',udf1(F.split(df.col1,','),F.split(df.col2,',')))
df.show(1,False)
+-----------+-----------+---------------------------+
|col1       |col2       |col3                       |
+-----------+-----------+---------------------------+
|abc,def,ghi|1.0,2.0,3.0|[abc_1.0, def_2.0, ghi_3.0]|
+-----------+-----------+---------------------------+
like image 42
Suresh Avatar answered Sep 18 '22 14:09

Suresh