Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming Writestream to Hive ORC Partioned External Table

I am trying to use Spark Structured Streaming - writeStream API to write to an External Partitioned Hive table.

CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
 PARTITIONED BY (
`year` int, `month` int, `day` int)      
 CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC 
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');

and in Spark code,

val hiveOrcWriter:   DataStreamWriter[Row] = event_stream
  .writeStream
  .outputMode("append")
  .format("orc")
  .partitionBy("year","month","day")
  //.option("compression", "zlib")
  .option("path", _table_loc)
  .option("checkpointLocation", _table_checkpoint)

I see that on a non partition table, records are inserted into Hive. However, on using partitioned table, the spark job does not fail or raise exceptions but records are not inserted to Hive table.

Appreciate comments from anyone who has dealt with similar problems.

Edit:

Just discovered that the .orc files are indeed written to the HDFS, withe correct partition directory structure: eg. /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc

However

select * from 'XX' limit 1; (or where year=2018)

returns no rows.

The InputFormat and OutputFormat for the Table 'XX' are org.apache.hadoop.hive.ql.io.orc.OrcInputFormat and org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat respectively.

like image 703
irrelevantUser Avatar asked Aug 11 '18 22:08

irrelevantUser


People also ask

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

Can Spark structured streaming API can be used to process graph data?

Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.

Is there any relationship between hive and Spark partitioning?

Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence.


Video Answer


1 Answers

This feature isn't provided out of the box in structured streaming. In normal processing, you would use dataset.write.saveAsTable(table_name) , and that method isn't available.

After processing and saving the data in HDFS, you can manually update the partitions (or using a script that does this on a schedule):

If you use Hive

MSCK REPAIR TABLE table_name

If you use Impala

ALTER TABLE table_name RECOVER PARTITIONS
like image 184
Shikkou Avatar answered Oct 13 '22 19:10

Shikkou