Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to check if array column is inside another column array in PySpark dataframe

Suppose I have the following case

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("ev", ArrayType(StringType()), True),
    StructField("ev2", ArrayType(StringType()), True),])
df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]},
                            {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]},
                            {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]},
                            {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}],
                           schema=schema)

Which gives me:

df.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se1|[ev11, ev12]|      [ev11]|
|se2|      [ev11]|[ev11, ev12]|
|se3|      [ev21]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

I want to create a new column of boolean (or select only the true cases) for the rows where the contents of the "ev" column are inside the "ev2" column, returning:

df_target.show()
+---+------------+------------+
| id|          ev|         ev2|
+---+------------+------------+
|se2|      [ev11]|[ev11, ev12]|
|se4|[ev21, ev22]|[ev21, ev22]|
+---+------------+------------+

or:

df_target.show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

I tried using the isin method:

df.withColumn('evInEv2', df['ev'].isin(df['ev2'])).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|  false|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+

But it looks like it only checks if it's the same array.

I also tried the array_contains function from pyspark.sql.functions but only accepts one object and not an array to check.

I am having difficulties even searching for this due to phrasing the correct problem.

Thanks!

like image 741
dtj Avatar asked Feb 07 '17 15:02

dtj


2 Answers

One more implementation for Spark >= 2.4.0 avoiding UDF and using the built-in array_except:

from pyspark.sql.functions import size, array_except

def is_subset(a, b):
  return size(array_except(a, b)) == 0
  
df.withColumn("is_subset", is_subset(df.ev, df.ev2))

Output:

+---+------------+------------+---------+
| id|          ev|         ev2|is_subset|
+---+------------+------------+---------+
|se1|[ev11, ev12]|      [ev11]|    false|
|se2|      [ev11]|[ev11, ev12]|     true|
|se3|      [ev21]|[ev11, ev12]|    false|
|se4|[ev21, ev22]|[ev21, ev22]|     true|
+---+------------+------------+---------+
like image 131
abiratsis Avatar answered Nov 15 '22 04:11

abiratsis


Here's an option using a udf, where we check the length of the difference between the columns ev and ev2. When the length of the resulting array is 0 , or all elements of ev are contained within ev2, we return True; otherwise False.

def contains(x,y):
  z = len(set(x) - set(y))
  if z == 0:
    return True
  else:
    return False

contains_udf = udf(contains)
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show()
+---+------------+------------+-------+
| id|          ev|         ev2|evInEv2|
+---+------------+------------+-------+
|se1|[ev11, ev12]|      [ev11]|  false|
|se2|      [ev11]|[ev11, ev12]|   true|
|se3|      [ev21]|[ev11, ev12]|  false|
|se4|[ev21, ev22]|[ev21, ev22]|   true|
+---+------------+------------+-------+
like image 25
mtoto Avatar answered Nov 15 '22 04:11

mtoto