Problem :
I would like to make a spatial join between:
Here is what I have so far, which I find to be slow (lot of scheduler delay, maybe due to the fact that communes is not broadcasted) :
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
return joined_df[columns]
The pandas_udf takes in a bit of the points dataframe (traces) as a pandas dataframe, turns it into a GeoDataFrame with geopandas, and operates the spatial join with the polygons GeoDataFrame (therefore benefitting from the Rtree join of Geopandas)
Questions:
Is there a way to make it faster ? I understand that my communes geodataframe is in the Spark Driver's memory and that each worker has to download it for each call to the udf, is this correct ?
However I do not know how I could make this GeoDataFrame available directly to the workers (as in a broadcast join)
Any ideas ?
A spatial join uses binary predicates such as intersects and crosses to combine two GeoDataFrames based on the spatial relationship between their geometries.
There are two ways to combine datasets in geopandas – attribute joins and spatial joins. In an attribute join, a GeoSeries or GeoDataFrame is combined with a regular pandas Series or DataFrame based on a common variable. This is analogous to normal merging or joining in pandas.
A GeoDataFrame object is a pandas. DataFrame that has a column with geometry.
GeoPandas is an open source project to make working with geospatial data in python easier. GeoPandas extends the datatypes used by pandas to allow spatial operations on geometric types. Geometric operations are performed by shapely. Geopandas further depends on fiona for file access and matplotlib for plotting.
A year after , here is what I ended up doing as @ndricca suggested, the trick is to broadcast the communes, but you can't broadcast a GeoDataFrame
directy so you have to load it as a Spark DataFrame, then convert it to JSON before broadcasting it. Then you rebuild the GeoDataFrame
inside the UDF using shapely.wkt
(Well Known Text : a way to encode geometric objects as text)
Another trick is to use a salt in the groupby to ensure equal repartition of the data across the cluster
import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)
@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
polygons = [wkt.loads(w) for w in communes['wkt']]
gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
return joined_df[columns]
traces = traces.groupby(salt).apply(join_communes_bc)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With