I want to count the occurrences of list of substrings and create a column based on a column in the pyspark df which contains a long string.
Input:
ID History
1 USA|UK|IND|DEN|MAL|SWE|AUS
2 USA|UK|PAK|NOR
3 NOR|NZE
4 IND|PAK|NOR
lst=['USA','IND','DEN']
Output :
ID History Count
1 USA|UK|IND|DEN|MAL|SWE|AUS 3
2 USA|UK|PAK|NOR 1
3 NOR|NZE 0
4 IND|PAK|NOR 1
In PySpark, you can use distinct(). count() of DataFrame or countDistinct() SQL function to get the count distinct. distinct() eliminates duplicate records(matching all columns of a Row) from DataFrame, count() returns the count of records on DataFrame.
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
In Spark, the Count function returns the number of elements present in the dataset.
stddev() in PySpark is used to return the standard deviation from a particular column in the DataFrame. Before that, we have to create PySpark DataFrame for demonstration.
# Importing requisite packages and creating a DataFrame
from pyspark.sql.functions import split, col, size, regexp_replace
values = [(1,'USA|UK|IND|DEN|MAL|SWE|AUS'),(2,'USA|UK|PAK|NOR'),(3,'NOR|NZE'),(4,'IND|PAK|NOR')]
df = sqlContext.createDataFrame(values,['ID','History'])
df.show(truncate=False)
+---+--------------------------+
|ID |History |
+---+--------------------------+
|1 |USA|UK|IND|DEN|MAL|SWE|AUS|
|2 |USA|UK|PAK|NOR |
|3 |NOR|NZE |
|4 |IND|PAK|NOR |
+---+--------------------------+
The idea is to split the string based on these three delimiters
: lst=['USA','IND','DEN']
and then count the number of substrings produced.
For eg; the string USA|UK|IND|DEN|MAL|SWE|AUS
gets split like - ,
, |UK|
, |
, |MAL|SWE|AUS
. Since, there were 4 substrings created and there were 3 delimiters matches, so 4-1 = 3
gives the count of these strings appearing in the column string.
I am not sure if multi character delimiters are supported in Spark, so as a first step, we replace any of these 3 sub-strings in the list ['USA','IND','DEN']
with a flag/dummy value %
. You could use something else as well. The following code does this replacement
-
df = df.withColumn('History_X',col('History'))
lst=['USA','IND','DEN']
for i in lst:
df = df.withColumn('History_X', regexp_replace(col('History_X'), i, '%'))
df.show(truncate=False)
+---+--------------------------+--------------------+
|ID |History |History_X |
+---+--------------------------+--------------------+
|1 |USA|UK|IND|DEN|MAL|SWE|AUS|%|UK|%|%|MAL|SWE|AUS|
|2 |USA|UK|PAK|NOR |%|UK|PAK|NOR |
|3 |NOR|NZE |NOR|NZE |
|4 |IND|PAK|NOR |%|PAK|NOR |
+---+--------------------------+--------------------+
Finally, we count the number of substrings created by splitting
it first with %
being the delimiter, then counting the number of substrings created with size
function and finally subtracting 1 from it.
df = df.withColumn('Count', size(split(col('History_X'), "%")) - 1).drop('History_X')
df.show(truncate=False)
+---+--------------------------+-----+
|ID |History |Count|
+---+--------------------------+-----+
|1 |USA|UK|IND|DEN|MAL|SWE|AUS|3 |
|2 |USA|UK|PAK|NOR |1 |
|3 |NOR|NZE |0 |
|4 |IND|PAK|NOR |1 |
+---+--------------------------+-----+
If you are using Spark 2.4+, you can try the SPARK SQL higher order function filter()
:
from pyspark.sql import functions as F
>>> df.show(5,0)
+---+--------------------------+
|ID |History |
+---+--------------------------+
|1 |USA|UK|IND|DEN|MAL|SWE|AUS|
|2 |USA|UK|PAK|NOR |
|3 |NOR|NZE |
|4 |IND|PAK|NOR |
+---+--------------------------+
df_new = df.withColumn('data', F.split('History', '\|')) \
.withColumn('cnt', F.expr('size(filter(data, x -> x in ("USA", "IND", "DEN")))'))
>>> df_new.show(5,0)
+---+--------------------------+----------------------------------+---+
|ID |History |data |cnt|
+---+--------------------------+----------------------------------+---+
|1 |USA|UK|IND|DEN|MAL|SWE|AUS|[USA, UK, IND, DEN, MAL, SWE, AUS]|3 |
|2 |USA|UK|PAK|NOR |[USA, UK, PAK, NOR] |1 |
|3 |NOR|NZE |[NOR, NZE] |0 |
|4 |IND|PAK|NOR |[IND, PAK, NOR] |1 |
+---+--------------------------+----------------------------------+---+
Where we first split the field History
into an array column called data
and then use the filter function:
filter(data, x -> x in ("USA", "IND", "DEN"))
to retrieve only array elements which satisfy the condition: IN ("USA", "IND", "DEN")
, after that, we count the resulting array with size()
function.
UPDATE: Added another way to use array_contains() which should works for old version Spark:
lst = ["USA", "IND", "DEN"]
df_new = df.withColumn('data', F.split('History', '\|')) \
.withColumn('Count', sum([F.when(F.array_contains('data',e),1).otherwise(0) for e in lst]))
Note: duplicate entries in arrays will be skipped, this method only counts unique Country code.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With