Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count occurrences of a list of substrings in a pyspark df column

I want to count the occurrences of list of substrings and create a column based on a column in the pyspark df which contains a long string.

Input:          
       ID    History

       1     USA|UK|IND|DEN|MAL|SWE|AUS
       2     USA|UK|PAK|NOR
       3     NOR|NZE
       4     IND|PAK|NOR

 lst=['USA','IND','DEN']


Output :
       ID    History                      Count

       1     USA|UK|IND|DEN|MAL|SWE|AUS    3
       2     USA|UK|PAK|NOR                1
       3     NOR|NZE                       0
       4     IND|PAK|NOR                   1
like image 377
Faliha Zikra Avatar asked Jul 16 '19 05:07

Faliha Zikra


People also ask

How do you get the count of records in PySpark?

In PySpark, you can use distinct(). count() of DataFrame or countDistinct() SQL function to get the count distinct. distinct() eliminates duplicate records(matching all columns of a Row) from DataFrame, count() returns the count of records on DataFrame.

How do you count distinct values in a column PySpark?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.

What does count () do in spark?

In Spark, the Count function returns the number of elements present in the dataset.

What is Stddev in PySpark?

stddev() in PySpark is used to return the standard deviation from a particular column in the DataFrame. Before that, we have to create PySpark DataFrame for demonstration.


2 Answers

# Importing requisite packages and creating a DataFrame
from pyspark.sql.functions import split, col, size, regexp_replace
values = [(1,'USA|UK|IND|DEN|MAL|SWE|AUS'),(2,'USA|UK|PAK|NOR'),(3,'NOR|NZE'),(4,'IND|PAK|NOR')]
df = sqlContext.createDataFrame(values,['ID','History'])
df.show(truncate=False)
+---+--------------------------+
|ID |History                   |
+---+--------------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |
|3  |NOR|NZE                   |
|4  |IND|PAK|NOR               |
+---+--------------------------+

The idea is to split the string based on these three delimiters: lst=['USA','IND','DEN'] and then count the number of substrings produced.

For eg; the string USA|UK|IND|DEN|MAL|SWE|AUS gets split like - ,, |UK|, |, |MAL|SWE|AUS. Since, there were 4 substrings created and there were 3 delimiters matches, so 4-1 = 3 gives the count of these strings appearing in the column string.

I am not sure if multi character delimiters are supported in Spark, so as a first step, we replace any of these 3 sub-strings in the list ['USA','IND','DEN'] with a flag/dummy value %. You could use something else as well. The following code does this replacement -

df = df.withColumn('History_X',col('History'))
lst=['USA','IND','DEN']
for i in lst:
    df = df.withColumn('History_X', regexp_replace(col('History_X'), i, '%'))
df.show(truncate=False)
+---+--------------------------+--------------------+
|ID |History                   |History_X           |
+---+--------------------------+--------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|%|UK|%|%|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |%|UK|PAK|NOR        |
|3  |NOR|NZE                   |NOR|NZE             |
|4  |IND|PAK|NOR               |%|PAK|NOR           |
+---+--------------------------+--------------------+

Finally, we count the number of substrings created by splitting it first with % being the delimiter, then counting the number of substrings created with size function and finally subtracting 1 from it.

df = df.withColumn('Count', size(split(col('History_X'), "%")) - 1).drop('History_X')
df.show(truncate=False)
+---+--------------------------+-----+
|ID |History                   |Count|
+---+--------------------------+-----+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|3    |
|2  |USA|UK|PAK|NOR            |1    |
|3  |NOR|NZE                   |0    |
|4  |IND|PAK|NOR               |1    |
+---+--------------------------+-----+
like image 110
cph_sto Avatar answered Sep 20 '22 16:09

cph_sto


If you are using Spark 2.4+, you can try the SPARK SQL higher order function filter():

from pyspark.sql import functions as F

>>> df.show(5,0)
+---+--------------------------+
|ID |History                   |
+---+--------------------------+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|
|2  |USA|UK|PAK|NOR            |
|3  |NOR|NZE                   |
|4  |IND|PAK|NOR               |
+---+--------------------------+

df_new = df.withColumn('data', F.split('History', '\|')) \
           .withColumn('cnt', F.expr('size(filter(data, x -> x in ("USA", "IND", "DEN")))'))

>>> df_new.show(5,0)
+---+--------------------------+----------------------------------+---+
|ID |History                   |data                              |cnt|
+---+--------------------------+----------------------------------+---+
|1  |USA|UK|IND|DEN|MAL|SWE|AUS|[USA, UK, IND, DEN, MAL, SWE, AUS]|3  |
|2  |USA|UK|PAK|NOR            |[USA, UK, PAK, NOR]               |1  |
|3  |NOR|NZE                   |[NOR, NZE]                        |0  |
|4  |IND|PAK|NOR               |[IND, PAK, NOR]                   |1  |
+---+--------------------------+----------------------------------+---+

Where we first split the field History into an array column called data and then use the filter function:

filter(data, x -> x in ("USA", "IND", "DEN"))

to retrieve only array elements which satisfy the condition: IN ("USA", "IND", "DEN"), after that, we count the resulting array with size() function.

UPDATE: Added another way to use array_contains() which should works for old version Spark:

lst = ["USA", "IND", "DEN"]

df_new = df.withColumn('data', F.split('History', '\|')) \
           .withColumn('Count', sum([F.when(F.array_contains('data',e),1).otherwise(0) for e in lst]))

Note: duplicate entries in arrays will be skipped, this method only counts unique Country code.

like image 22
jxc Avatar answered Sep 21 '22 16:09

jxc