Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Filter by whether column value equals a list in Spark

I'm trying to filter a Spark dataframe based on whether the values in a column equal a list. I would like to do something like this:

filtered_df = df.where(df.a == ['list','of' , 'stuff'])

Where filtered_df only contains rows where the value of filtered_df.a is ['list','of' , 'stuff'] and the type of a is array (nullable = true).

like image 502
Luke Avatar asked Mar 24 '16 18:03

Luke


2 Answers

Update:

With current versions you can use an array of literals:

from pyspark.sql.functions import array, lit

df.where(df.a == array(*[lit(x) for x in ['list','of' , 'stuff']]))

Original answer:

Well, a little bit hacky way to do it, which doesn't require a Python batch job, is something like this:

from pyspark.sql.functions import col, lit, size
from functools import reduce
from operator import and_

def array_equal(c, an_array):
    same_size = size(c) == len(an_array)  # Check if the same size
    # Check if all items equal
    same_items = reduce(
        and_, 
        (c.getItem(i) == an_array[i] for i in range(len(an_array)))
    )
    return and_(same_size, same_items)

Quick test:

df = sc.parallelize([
    (1, ['list','of' , 'stuff']),
    (2, ['foo', 'bar']),
    (3, ['foobar']),
    (4, ['list','of' , 'stuff', 'and', 'foo']),
    (5, ['a', 'list','of' , 'stuff']),
]).toDF(['id', 'a'])

df.where(array_equal(col('a'), ['list','of' , 'stuff'])).show()
## +---+-----------------+
## | id|                a|
## +---+-----------------+
## |  1|[list, of, stuff]|
## +---+-----------------+
like image 85
zero323 Avatar answered Sep 18 '22 09:09

zero323


You can use a combination of "array", "lit" and "array_except" function to achieve this.

  1. We create an array column using lit(array(lit("list"),lit("of"),lit("stuff"))
  2. Then we used array_exept function to get the values present in first array and not present in second array.
  3. Then we filter for empty result array which means all the elements in first array are same as of ["list", "of", "stuff"]

Note: array_except function is available from spark 2.4.0.

Here is the code:

# Import libraries
from pyspark.sql.functions import *

# Create DataFrame
df = sc.parallelize([
    (1, ['list','of' , 'stuff']),
    (2, ['foo', 'bar']),
    (3, ['foobar']),
    (4, ['list','of' , 'stuff', 'and', 'foo']),
    (5, ['a', 'list','of' , 'stuff']),
]).toDF(['id', 'a'])

# Solution
df1 = df.filter(size(array_except(df["a"], lit(array(lit("list"),lit("of"),lit("stuff"))))) == 0)

# Display result
df1.show() 

Output

+---+-----------------+
| id|                a|
+---+-----------------+
|  1|[list, of, stuff]|
+---+-----------------+

I hope this helps.

like image 20
Neeraj Bhadani Avatar answered Sep 21 '22 09:09

Neeraj Bhadani