I am trying to optimize the code below (PySpark UDF).
It gives me the desired result (based on my data set) but it's too slow on very large datasets (approx. 180M).
The results (accuracy) are better than available Python modules (e.g. geotext, hdx-python-country). So I'm not looking for another module.
DataFrame:
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"]
]).toDF("address","name")
regex.csv:
iso2;keywords
US;\bArizona\b
US;\bTexas\b
US;\bFlorida\b
US;\bChicago\b
US;\bAmsterdam\b
US;\bProsper\b
US;\bUS$
CA;\bAlberta\b
CA;\bNova Scotia\b
CA;\bNova Scotia\b
CA;\bWhitehorse\b
CA;\bCA$
NL;\bAmsterdam\b
NL;\Netherlands\b
NL;\bNL$
......<many, many more>
Creating a Pandas DataFrame from regex.csv
, group by iso2
and joining the keywords
(\bArizona\b|\bTexas\b\bFlorida\b|\bUS$
).
df = pd.read_csv(regex.csv, sep=';')
df_regex = df.groupby('iso2').agg({'keywords': '|'.join }).reset_index()
Function:
def get_iso2(x):
iso2={}
for j, row in df_regex.iterrows():
regex = re.compile(row['keywords'],re.I|re.M)
matches = re.finditer(regex, x)
for m in matches:
iso2[row['iso2']] = iso2.get(row['iso2'], 0) + 1
return [key for key, value in iso2.items() for _ in range(value)]
PySpark UDF:
get_iso2_udf = F.udf(get_iso2, T.ArrayType(T.StringType()))
Create new column:
df_new = df.withColumn('iso2',get_iso2_udf('address')
Expected sample output:
[US,US,NL]
[CA]
[BE,BE,AU]
Some places occur in more than one country (input is address column with city, province, state, country...)
Sample:
3030 Whispering Pines Circle, Prosper Texas, US -> [US,US,US]
Kalverstraat Amsterdam -> [US,NL]
Kalverstraat Amsterdam, Netherlands -> [US, NL, NL]
Maybe using Scala UDFs in PySpark is an option, but I have no idea how.
Your optimisation recommendations are highly appreciated!
IIUC, you can try the following steps without using UDF:
from pyspark.sql.functions import expr, first, collect_list, broadcast, monotonically_increasing_id, flatten
import pandas as pd
df = spark.createDataFrame([
["3030 Whispering Pines Circle, Prosper Texas, US","John"],
["Kalverstraat Amsterdam","Mary"],
["Kalverstraat Amsterdam, Netherlands","Lex"],
["xvcv", "ddd"]
]).toDF("address","name")
Step-1: convert df_regex to a Spark dataframe df1
and add an unique_id to df
.
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
# adjust keywords to uppercase except chars preceded with backslash:
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
# create regex patterns:
df_regex = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)}).reset_index()
df1 = spark.createDataFrame(df_regex)
df1.show(truncate=False)
+----+---------------------------------------------------------------------------------+
|iso2|keywords |
+----+---------------------------------------------------------------------------------+
|CA |(?m)\bALBERTA\b|\bNOVA SCOTIA\b|\bWHITEHORSE\b|\bCA$ |
|NL |(?m)\bAMSTERDAM\b|\bNETHERLANDS\b|\bNL$ |
|US |(?m)\bARIZONA\b|\bTEXAS\b|\bFLORIDA\b|\bCHICAGO\b|\bAMSTERDAM\b|\bPROSPER\b|\bUS$|
+----+---------------------------------------------------------------------------------+
df = df.withColumn('id', monotonically_increasing_id())
df.show(truncate=False)
+-----------------------------------------------+----+---+
|address |name|id |
+-----------------------------------------------+----+---+
|3030 Whispering Pines Circle, Prosper Texas, US|John|0 |
|Kalverstraat Amsterdam |Mary|1 |
|Kalverstraat Amsterdam, Netherlands |Lex |2 |
|xvcv |ddd |3 |
+-----------------------------------------------+----+---+
Step-2: left join df_regex to df using rlike
df2 = df.alias('d1').join(broadcast(df1.alias('d2')), expr("upper(d1.address) rlike d2.keywords"), "left")
df2.show()
+--------------------+----+---+----+--------------------+
| address|name| id|iso2| keywords|
+--------------------+----+---+----+--------------------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...|
| xvcv| ddd| 3|null| null|
+--------------------+----+---+----+--------------------+
Step-3: count number of matched d2.keywords
in d1.address
by splitting d1.address
by d2.keywords
, and then reduce the size of the resulting Array by 1:
df3 = df2.withColumn('num_matches', expr("size(split(upper(d1.address), d2.keywords))-1"))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| -2|
+--------------------+----+---+----+--------------------+-----------+
Step-4: use array_repeat to repeat the value of iso2
num_matches
times (require Spark 2.4+):
df4 = df3.withColumn("iso2", expr("array_repeat(iso2, num_matches)"))
+--------------------+----+---+------------+--------------------+-----------+
| address|name| id| iso2| keywords|num_matches|
+--------------------+----+---+------------+--------------------+-----------+
|3030 Whispering P...|John| 0|[US, US, US]|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| [NL]|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| [US]|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| [NL, NL]|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| [US]|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3| []| null| -2|
+--------------------+----+---+------------+--------------------+-----------+
Step-5: groupby and do the aggregation:
df_new = df4 \
.groupby('id') \
.agg(
first('address').alias('address'),
first('name').alias('name'),
flatten(collect_list('iso2')).alias('countries')
)
+---+--------------------+----+------------+
| id| address|name| countries|
+---+--------------------+----+------------+
| 0|3030 Whispering P...|John|[US, US, US]|
| 1|Kalverstraat Amst...|Mary| [NL, US]|
| 3| xvcv| ddd| []|
| 2|Kalverstraat Amst...| Lex|[NL, NL, US]|
+---+--------------------+----+------------+
Alternative: Step-3 can also be handled by Pandas UDF:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pandas import Series
import re
@pandas_udf("int", PandasUDFType.SCALAR)
def get_num_matches(addr, ptn):
return Series([ 0 if p is None else len(re.findall(p,s)) for p,s in zip(ptn,addr) ])
df3 = df2.withColumn("num_matches", get_num_matches(expr('upper(address)'), 'keywords'))
+--------------------+----+---+----+--------------------+-----------+
| address|name| id|iso2| keywords|num_matches|
+--------------------+----+---+----+--------------------+-----------+
|3030 Whispering P...|John| 0| US|(?m)\bARIZONA\b|\...| 3|
|Kalverstraat Amst...|Mary| 1| NL|(?m)\bAMSTERDAM\b...| 1|
|Kalverstraat Amst...|Mary| 1| US|(?m)\bARIZONA\b|\...| 1|
|Kalverstraat Amst...| Lex| 2| NL|(?m)\bAMSTERDAM\b...| 2|
|Kalverstraat Amst...| Lex| 2| US|(?m)\bARIZONA\b|\...| 1|
| xvcv| ddd| 3|null| null| 0|
+--------------------+----+---+----+--------------------+-----------+
Notes:
\b
, \B
, \A
, \z
) to upper case.rlike
and regexp_replace
are Java-based while in pandas_udf it's Python-based which might have slight differences when setting up patterns in regex.csv.As using join and groupby triggers data shuffling, the above method could be slow. Just one more option for your testing:
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = spark.sparkContext.broadcast(
df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
)
df_ptn.value
#{'CA': '(?m)\\bALBERTA\\b|\\bNOVA SCOTIA\\b|\\bNOVA SCOTIA\\b|\\bWHITEHORSE\\b|\\bCA$',
# 'NL': '(?m)\\bAMSTERDAM\\b|\\bNETHERLANDS\\b|\\bNL$',
# 'US': '(?m)\\bARIZONA\\b|\\bTEXAS\\b|\\bFLORIDA\\b|\\bCHICAGO\\b|\\bAMSTERDAM\\b|\\bPROSPER\\b|\\bUS$'}
# REF: https://stackoverflow.com/questions/952914/how-to-make-a-flat-list-out-of-list-of-lists
from operator import iconcat
from functools import reduce
from pandas import Series
from pyspark.sql.functions import pandas_udf, PandasUDFType, flatten
def __get_iso2(addr, ptn):
return Series([ reduce(iconcat, [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()]) for s in addr ])
get_iso2 = pandas_udf(lambda x:__get_iso2(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2(expr("upper(address)"))).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
Or return an array of arrays in pandas_udf (w/o reduce
and iconcat
) and do flatten
with Spark:
def __get_iso2_2(addr, ptn):
return Series([ [[k]*len(re.findall(v,s)) for k,v in ptn.value.items()] for s in addr ])
get_iso2_2 = pandas_udf(lambda x:__get_iso2_2(x, df_ptn), "array<array<string>>", PandasUDFType.SCALAR)
df.withColumn('iso2', flatten(get_iso2_2(expr("upper(address)")))).show()
Update: to find unique countries, do the following:
def __get_iso2_3(addr, ptn):
return Series([ [k for k,v in ptn.value.items() if re.search(v,s)] for s in addr ])
get_iso2_3 = pandas_udf(lambda x:__get_iso2_3(x, df_ptn), "array<string>", PandasUDFType.SCALAR)
df.withColumn('iso2', get_iso2_3(expr("upper(address)"))).show()
+--------------------+----+--------+
| address|name| iso2|
+--------------------+----+--------+
|3030 Whispering P...|John| [US]|
|Kalverstraat Amst...|Mary|[NL, US]|
|Kalverstraat Amst...| Lex|[NL, US]|
| xvcv| ddd| []|
+--------------------+----+--------+
Similar to @CronosNull's method, In case the list of regex.csv is manageable, you can handle this using a list comprehension:
from pyspark.sql.functions import size, split, upper, col, array, expr, flatten
df_regex = pd.read_csv("file:///path/to/regex.csv", sep=";")
df_regex["keywords"] = df_regex["keywords"].str.replace(r'(^|\\.)([^\\]*)', lambda m: m.group(1) + m.group(2).upper())
df_ptn = df_regex.groupby('iso2').agg({'keywords':lambda x: '(?m)' + '|'.join(x)})["keywords"].to_dict()
df1 = df.select("*", *[ (size(split(upper(col('address')), v))-1).alias(k) for k,v in df_ptn.items()])
df1.select(*df.columns, flatten(array(*[ expr("array_repeat('{0}',`{0}`)".format(c)) for c in df_ptn.keys() ])).alias('iso2')).show()
+--------------------+----+---+------------+
| address|name| id| iso2|
+--------------------+----+---+------------+
|3030 Whispering P...|John| 0|[US, US, US]|
|Kalverstraat Amst...|Mary| 1| [NL, US]|
|Kalverstraat Amst...| Lex| 2|[NL, NL, US]|
| xvcv| ddd| 3| []|
+--------------------+----+---+------------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With