Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to insert spark structured streaming DataFrame to Hive external table/location?

One query on spark structured streaming integration with HIVE table.

I have tried to do some examples of spark structured streaming.

here is my example

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

As you can see in the last step while writing data-frame to hdfs location, , the data is not getting inserted into the exciting directory (my existing directory having some old data partitioned by "age").

I am getting

spark.sql.AnalysisException : queries with streaming source must be executed with writeStream start()

Can you help why i am not able to insert data in to existing directory in hdfs location ? or is there any other way that i can do "insert into " operation on hive table ?

Looking for a solution

like image 957
BigD Avatar asked Dec 28 '18 20:12

BigD


People also ask

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Where are Spark tables stored?

Shared Spark tables These files are normally stored in the warehouse directory where managed table data is stored. Spark also provides ways to create external tables over existing data, either by providing the LOCATION option or using the Hive format.


1 Answers

On HDP 3.1 with Spark 2.3.2 and Hive 3.1.0 we have used Hortonwork's spark-llap library to write structured streaming DataFrame from Spark to Hive. On GitHub you will find some documentation on its usage.

The required library hive-warehouse-connector-assembly-1.0.0.3.1.0.0-78.jar is available on Maven and needs to be passed on in the spark-submit command. There are many more recent versions of that library, although I haven't had the chance to test them.

After creating the Hive table manually (e.g. through beeline/Hive shell) you could apply the following code:

import com.hortonworks.hwc.HiveWarehouseSession

val csvDF = spark.readStream.[...].load()

val query = csvDF.writeStream
  .format(HiveWarehouseSession.STREAM_TO_STREAM)
  .option("database", "database_name")
  .option("table", "table_name")
  .option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
  .option("checkpointLocation", "/path/to/checkpoint/dir")
  .start()

query.awaitTermination()
like image 87
Michael Heil Avatar answered Sep 28 '22 19:09

Michael Heil