How to insert spark structured streaming DataFrame to Hive external table/location?

One query on spark structured streaming integration with HIVE table.

I have tried to do some examples of spark structured streaming.

here is my example

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 val query= spark.sql("insert into table_abcd select * from updates")


As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").

I am getting

spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()

Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?

Looking for a solution

1 Answers

On HDP 3.1 with Spark 2.3.2 and Hive 3.1.0 we have used Hortonwork's spark-llap library to write structured streaming DataFrame from Spark to Hive. On GitHub you will find some documentation on its usage.

The required library hive-warehouse-connector-assembly- is available on Maven and needs to be passed on in the spark-submit command. There are many more recent versions of that library, although I haven't had the chance to test them.

After creating the Hive table manually (e.g. through beeline/Hive shell) you could apply the following code:

import com.hortonworks.hwc.HiveWarehouseSession

val csvDF = spark.readStream.[...].load()

val query = csvDF.writeStream
  .option("database", "database_name")
  .option("table", "table_name")
  .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
  .option("checkpointLocation", "/path/to/checkpoint/dir")

