Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Postgis geometry type in Apache Spark JDBC DataFrame

I would like to know if I can use the Postgis geometry type in Apache Spark's SQL and DataFrames.

I got this far: I first noticed that I could write a Postgis Dialect and a user defined type that I called PostgisDialect and GeometryType. Here is my code:

object PostgisDialect extends JdbcDialect {

  override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")

  override def getCatalystType(
    sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    if (sqlType == Types.OTHER) {
      toCatalystType(typeName)
    } else None
  }

  // TODO: support more type names.
  private def toCatalystType(typeName: String): Option[DataType] = typeName match {
    case "geometry" => Some(GeometryType)
    case _ => None
  }

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case GeometryType => Some(JdbcType("geometry", Types.OTHER))
    case _ => None
  }

  override def getTableExistsQuery(table: String): String = {
    s"SELECT 1 FROM $table LIMIT 1"
  }

  override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
    super.beforeFetch(connection, properties)

    if (properties.getOrElse("fetchsize", "0").toInt > 0) {
      connection.setAutoCommit(false)
    }

  }

}
class GeometryType private() extends UserDefinedType[Geometry] {

  override def sqlType: DataType = BinaryType

  override def pyUDT: String = "my.types.GeometryType"

  override def serialize(obj: Any): GenericArrayData = {
    obj match {
      case p: Geometry =>
        val output = (new WKBWriter).write(p)
        new GenericArrayData(output)
    }
  }

  override def deserialize(datum: Any): Geometry = {
    datum match {
      case values: Array[Byte] => (new WKBReader).read(values)
    }
  }

  override def userClass: Class[Geometry] = classOf[Geometry]

  override def asNullable: GeometryType = this
}

case object GeometryType extends GeometryType

So far so good, but when the JDBCRDD calls the method getConversions:

/**
 * Maps a StructType to a type tag list.
 */
def getConversions(schema: StructType): Array[JDBCConversion] =
    schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))

private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match {
    case BooleanType => BooleanConversion
    case DateType => DateConversion
    case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
    case DoubleType => DoubleConversion
    case FloatType => FloatConversion
    case IntegerType => IntegerConversion
    case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion
    case StringType => StringConversion
    case TimestampType => TimestampConversion
    case BinaryType => BinaryConversion
    case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
    case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
  }

Of course there is no conversion for my custom type.

Caused by: java.lang.IllegalArgumentException: Unsupported type geometry
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConversions(JDBCRDD.scala:351)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.getConversions(JDBCRDD.scala:337)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:385)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:359)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

Is there a way to register a conversion for my custom type?

like image 804
Hugo Schneider Avatar asked Nov 08 '22 19:11

Hugo Schneider


1 Answers

Spark doesn't have built-in function to process Geometry data but below libraries are created to work on the Geometry data type. Both libraries provide a function that you can use to process geometry datatype.

  1. sedona link - https://sedona.apache.org/tutorial/sql/
  2. GeoMesa link - https://www.geomesa.org/documentation/stable/user/spark/index.html

I Recently used both these libraries to work on the geometry data type to simplify and merge geometries and load it to the postgress.

Below are some of the suggestions -

1.Go through the documentation and see if the function that you need is present in any one of these and use one libaray at a time as installing both on the same cluster can cause some issues.

2.There are different version which is compatible with different spark version, you can find the details of spark version that you are using and compatibility using this link https://sedona.apache.org/download/overview/

3.Please follow steps give in this tutorial - https://sedona.apache.org/tutorial/sql/

After "Register SedonaSQL"step, you can cross check by running below command if all the functions are available for the use.

spark.catalog.listFunctions().show()

Thanks.

like image 130
yogesh garud Avatar answered Nov 15 '22 09:11

yogesh garud