I would like to know if I can use the Postgis geometry type in Apache Spark's SQL and DataFrames.
I got this far: I first noticed that I could write a Postgis Dialect and a user defined type that I called PostgisDialect
and GeometryType
. Here is my code:
object PostgisDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")
override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.OTHER) {
toCatalystType(typeName)
} else None
}
// TODO: support more type names.
private def toCatalystType(typeName: String): Option[DataType] = typeName match {
case "geometry" => Some(GeometryType)
case _ => None
}
override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case GeometryType => Some(JdbcType("geometry", Types.OTHER))
case _ => None
}
override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}
override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
super.beforeFetch(connection, properties)
if (properties.getOrElse("fetchsize", "0").toInt > 0) {
connection.setAutoCommit(false)
}
}
}
class GeometryType private() extends UserDefinedType[Geometry] {
override def sqlType: DataType = BinaryType
override def pyUDT: String = "my.types.GeometryType"
override def serialize(obj: Any): GenericArrayData = {
obj match {
case p: Geometry =>
val output = (new WKBWriter).write(p)
new GenericArrayData(output)
}
}
override def deserialize(datum: Any): Geometry = {
datum match {
case values: Array[Byte] => (new WKBReader).read(values)
}
}
override def userClass: Class[Geometry] = classOf[Geometry]
override def asNullable: GeometryType = this
}
case object GeometryType extends GeometryType
So far so good, but when the JDBCRDD calls the method getConversions
:
/**
* Maps a StructType to a type tag list.
*/
def getConversions(schema: StructType): Array[JDBCConversion] =
schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))
private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match {
case BooleanType => BooleanConversion
case DateType => DateConversion
case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
case DoubleType => DoubleConversion
case FloatType => FloatConversion
case IntegerType => IntegerConversion
case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
}
Of course there is no conversion for my custom type.
Caused by: java.lang.IllegalArgumentException: Unsupported type geometry
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConversions(JDBCRDD.scala:351)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.getConversions(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:385)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:359)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Is there a way to register a conversion for my custom type?
Spark doesn't have built-in function to process Geometry data but below libraries are created to work on the Geometry data type. Both libraries provide a function that you can use to process geometry datatype.
I Recently used both these libraries to work on the geometry data type to simplify and merge geometries and load it to the postgress.
Below are some of the suggestions -
1.Go through the documentation and see if the function that you need is present in any one of these and use one libaray at a time as installing both on the same cluster can cause some issues.
2.There are different version which is compatible with different spark version, you can find the details of spark version that you are using and compatibility using this link https://sedona.apache.org/download/overview/
3.Please follow steps give in this tutorial - https://sedona.apache.org/tutorial/sql/
After "Register SedonaSQL"step, you can cross check by running below command if all the functions are available for the use.
spark.catalog.listFunctions().show()
Thanks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With