I'm trying to write a Spark Dataset into an existent postgresql table (can't change the table metadata like column types). One of the columns of this table is of type HStore and it's causing trouble.
I see the following exception when I launch the write (here the original map is empty which when escaped gives an empty string):
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
at org.postgresql.core.v3.QueryExecutorImpl$1.handleError(QueryExecutorImpl.java:419)
at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:300)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:299)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying
This is how I'm doing it:
def escapePgHstore[A, B](hmap: Map[A, B]) = {
hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")
ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
.drop("original_column")
.coalesce(1).write
.mode(org.apache.spark.sql.SaveMode.Append)
.option("driver", "org.postgresql.Driver")
.jdbc(jdbcUrl, hashedTablePartName, props)
If I don't escape the original_column
from Map[String, Long] to String using escapePgHstore
I see the following errors:
java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType$2.apply(JdbcUtils.scala:137)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:293)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$7.apply(JdbcUtils.scala:292)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
What's the right way to make spark write a valid hstore data type??
hstore is deprecated. Use jsonb . @danger89 Actually, it's not formally deprecated, though I don't think there's any reason to use it in favour of jsonb anymore.
This module implements the hstore data type for storing sets of key/value pairs within a single PostgreSQL value. This can be useful in various scenarios, such as rows with many attributes that are rarely examined, or semi-structured data. Keys and values are simply text strings.
It turns out I have just to let postgres try to guess the appropriate type of my column. By setting stringtype
to unspecified
in the connection string as described in the official documentation.
props.put("stringtype", "unspecified")
Now it works perfectly !!
This is a pyspark code for writing a dataframe to a Postgres Table that has HSTORE JSON and JSONB columns. So in general for any complicated datatypes that have been created in Postgres which can't be created in Spark Dataframe, you need to specify stringtype="unspecified"
in the options or in the properties that you are setting to any write dataframe to SQL function.
Below is an example of writing a Spark Dataframe to PostgreSQL table using write()
function:
dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With