Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: arrays_zip equivalent in Spark 2.3

How to write the equivalent function of arrays_zip in Spark 2.3?

Source code from Spark 2.4

def arrays_zip(*cols):
    """
    Collection function: Returns a merged array of structs in which the N-th struct contains all
    N-th values of input arrays.

    :param cols: columns of arrays to be merged.

    >>> from pyspark.sql.functions import arrays_zip
    >>> df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
    >>> df.select(arrays_zip(df.vals1, df.vals2).alias('zipped')).collect()
    [Row(zipped=[Row(vals1=1, vals2=2), Row(vals1=2, vals2=3), Row(vals1=3, vals2=4)])]
    """
    sc = SparkContext._active_spark_context
    return Column(sc._jvm.functions.arrays_zip(_to_seq(sc, cols, _to_java_column)))

How to achieve similar in PySpark?

like image 700
bp2010 Avatar asked Apr 29 '20 14:04

bp2010


2 Answers

You can achieve this by creating User Defined Function

import pyspark.sql.functions as f
import pyspark.sql.types as t

arrays_zip_ = f.udf(lambda x, y: list(zip(x, y)),  
      t.ArrayType(t.StructType([
          # Choose Datatype according to requirement
          t.StructField("first", t.IntegerType()),
          t.StructField("second", t.StringType())
  ])))

df = spark.createDataFrame([(([1, 2, 3], ['2', '3', '4']))], ['first', 'second'])

Now results with spark<=2.3

df.select(arrays_zip_('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+

And result with Spark version 2.4

df.select(f.arrays_zip('first', 'second').alias('zipped')).show(2,False)

+------------------------+
|zipped                  |
+------------------------+
|[[1, 2], [2, 3], [3, 4]]|
+------------------------+
like image 145
Shubham Jain Avatar answered Oct 18 '22 01:10

Shubham Jain


You can use an UDF to obtain the same functionality as arrays_zip. Note that the column types need to be the same for this to work (in this case of IntegerType). If there are any differences in column types, convert the columns to a common type before using the UDF.

from pyspark.sql import functions as F
from pyspark.sql import types as T

def zip_func(*args):
    return list(zip(*args))

zip_udf = F.udf(zip_func, T.ArrayType(T.ArrayType(T.IntegerType())))

It can be used in the same way as arrays_zip, for example:

df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
df.select(zip_udf(df.vals1, df.vals2).alias('zipped')).collect()
like image 41
Shaido Avatar answered Oct 18 '22 00:10

Shaido