Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spatial Join between pyspark dataframe and polygons (geopandas)

Problem :

I would like to make a spatial join between:

  • A big Spark Dataframe (500M rows) with points (eg. points on a road)
  • a small geojson (20000 shapes) with polygons (eg. regions boundaries).

Here is what I have so far, which I find to be slow (lot of scheduler delay, maybe due to the fact that communes is not broadcasted) :

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes(traces):   
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces, geometry=geometry, crs = communes.crs)
    joined_df = gpd.sjoin(gdf_traces, communes, how='left', op='within')
    return joined_df[columns]

The pandas_udf takes in a bit of the points dataframe (traces) as a pandas dataframe, turns it into a GeoDataFrame with geopandas, and operates the spatial join with the polygons GeoDataFrame (therefore benefitting from the Rtree join of Geopandas)

Questions:

Is there a way to make it faster ? I understand that my communes geodataframe is in the Spark Driver's memory and that each worker has to download it for each call to the udf, is this correct ?

However I do not know how I could make this GeoDataFrame available directly to the workers (as in a broadcast join)

Any ideas ?

like image 704
Luis Blanche Avatar asked Dec 02 '19 17:12

Luis Blanche


People also ask

What is spatial join in Geopandas?

A spatial join uses binary predicates such as intersects and crosses to combine two GeoDataFrames based on the spatial relationship between their geometries.

How do I merge Geopandas DataFrame with pandas DataFrame?

There are two ways to combine datasets in geopandas – attribute joins and spatial joins. In an attribute join, a GeoSeries or GeoDataFrame is combined with a regular pandas Series or DataFrame based on a common variable. This is analogous to normal merging or joining in pandas.

What is a GeoDataFrame?

A GeoDataFrame object is a pandas. DataFrame that has a column with geometry.

What can Geopandas do?

GeoPandas is an open source project to make working with geospatial data in python easier. GeoPandas extends the datatypes used by pandas to allow spatial operations on geometric types. Geometric operations are performed by shapely. Geopandas further depends on fiona for file access and matplotlib for plotting.


1 Answers

A year after , here is what I ended up doing as @ndricca suggested, the trick is to broadcast the communes, but you can't broadcast a GeoDataFrame directy so you have to load it as a Spark DataFrame, then convert it to JSON before broadcasting it. Then you rebuild the GeoDataFrame inside the UDF using shapely.wkt (Well Known Text : a way to encode geometric objects as text)

Another trick is to use a salt in the groupby to ensure equal repartition of the data across the cluster

import geopandas as gpd
from shapely import wkt
from pyspark.sql.functions import broadcast
communes = gpd.load_file('...communes.geojson')
# Use a previously created spark session
traces= spark_session.read_csv('trajectoires.csv')
communes_spark = spark.createDataFrame(communes[['insee_comm', 'wkt']])
communes_json = provinces_spark.toJSON().collect()
communes_bc = spark.sparkContext.broadcast(communes_json)

@pandas_udf(schema_out, PandasUDFType.GROUPED_MAP)
def join_communes_bc(traces):
    communes = pd.DataFrame.from_records([json.loads(c) for c in communes_bc.value])
    polygons = [wkt.loads(w) for w in communes['wkt']]
    gdf_communes = gpd.GeoDataFrame(communes, geometry=polygons, crs=crs )
    geometry = gpd.points_from_xy(traces['longitude'], traces['latitude'])
    gdf_traces = gpd.GeoDataFrame(traces , geometry=geometry, crs=crs)
    joined_df = gpd.sjoin(gdf_traces, gdf_communes, how='left', op='within')
    return joined_df[columns]
    

traces = traces.groupby(salt).apply(join_communes_bc)
like image 58
Luis Blanche Avatar answered Oct 23 '22 18:10

Luis Blanche