Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read from a hive table and write back to it using spark sql

I am reading a Hive table using Spark SQL and assigning it to a scala val

val x = sqlContext.sql("select * from some_table")

Then I am doing some processing with the dataframe x and finally coming up with a dataframe y , which has the exact schema as the table some_table.

Finally I am trying to insert overwrite the y dataframe to the same hive table some_table

y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table")

Then I am getting the error

org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from

I tried creating an insert sql statement and firing it using sqlContext.sql() but it too gave me the same error.

Is there any way I can bypass this error? I need to insert the records back to the same table.


Hi I tried doing as suggested , but still getting the same error .

val x = sqlContext.sql("select * from incremental.test2")
val y = x.limit(5)
y.registerTempTable("temp_table")
val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("incremental.test2")

scala> dy.write.mode("overwrite").insertInto("incremental.test2")
             org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.;
like image 407
Avi Avatar asked Aug 03 '16 14:08

Avi


People also ask

Can you use Spark SQL to access data from Hive?

Spark SQL also supports reading and writing data stored in Apache Hive. However, since Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will load them automatically.

Does Spark SQL return a DataFrame?

The sql function on a SparkSession enables applications to run SQL queries programmatically and returns the result as a DataFrame .


4 Answers

Actually you can also use checkpointing to achieve this. Since it breaks data lineage, Spark is not able to detect that you are reading and overwriting in the same table:

 sqlContext.sparkContext.setCheckpointDir(checkpointDir)
 val ds = sqlContext.sql("select * from some_table").checkpoint()
 ds.write.mode("overwrite").saveAsTable("some_table")
like image 80
nsanglar Avatar answered Sep 16 '22 12:09

nsanglar


You should first save your DataFrame y in a temporary table

y.write.mode("overwrite").saveAsTable("temp_table")

Then you can overwrite rows in your target table

val dy = sqlContext.table("temp_table")
dy.write.mode("overwrite").insertInto("some_table")
like image 41
cheseaux Avatar answered Sep 19 '22 12:09

cheseaux


You should first save your DataFrame y like a parquet file:

y.write.parquet("temp_table")

After you load this like:

val parquetFile = sqlContext.read.parquet("temp_table")

And finish you insert your data in your table

parquetFile.write.insertInto("some_table")
like image 24
matteus silva Avatar answered Sep 16 '22 12:09

matteus silva


In context to Spark 2.2

  1. This error means that our process is reading from same table and writing to same table.
  2. Normally, this should work as process writes to directory .hiveStaging...
  3. This error occurs in case of saveAsTable method, as it overwrites entire table instead of individual partitions.
  4. This error should not occur with insertInto method, as it overwrites partitions not the table.
  5. A reason why this happening is because Hive table has following Spark TBLProperties in its definition. This problem will solve for insertInto method if you remove following Spark TBLProperties -

'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols 'spark.sql.sources.schema.numParts' 'spark.sql.sources.schema.part.0' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol.1'

https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html

when we upgraded our HDP to 2.6.3 The Spark was updated from 2.2 to 2.3 which resulted in below error -

Caused by: org.apache.spark.sql.AnalysisException: Cannot overwrite a path that is also being read from.;

at org.apache.spark.sql.execution.command.DDLUtils$.verifyNotReadPath(ddl.scala:906)

This error occurs for job where-in we are reading and writing to same path. Like Jobs with SCD Logic

Solution -

  1. Set --conf "spark.sql.hive.convertMetastoreOrc=false"
  2. or, update the job such that it writes data to a temporary table. Then reads from temporary table and insert it into final table.

https://querydb.blogspot.com/2020/09/orgapachesparksqlanalysisexception.html

like image 31
dinesh028 Avatar answered Sep 16 '22 12:09

dinesh028