I am trying to use Spark Structured Streaming - writeStream
API to write to an External Partitioned Hive table.
CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
PARTITIONED BY (
`year` int, `month` int, `day` int)
CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');
and in Spark code,
val hiveOrcWriter: DataStreamWriter[Row] = event_stream
.writeStream
.outputMode("append")
.format("orc")
.partitionBy("year","month","day")
//.option("compression", "zlib")
.option("path", _table_loc)
.option("checkpointLocation", _table_checkpoint)
I see that on a non partition table, records are inserted into Hive. However, on using partitioned table, the spark job does not fail or raise exceptions but records are not inserted to Hive table.
Appreciate comments from anyone who has dealt with similar problems.
Edit:
Just discovered that the .orc files are indeed written to the HDFS, withe correct partition directory structure: eg. /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc
However
select * from 'XX' limit 1; (or where year=2018)
returns no rows.
The InputFormat
and OutputFormat
for the Table 'XX' are org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
and
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
respectively.
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.
Note right away that spark partitions ≠ hive partitions. They are both chunks of data, but Spark splits data in order to process it in parallel in memory. Hive partition is in the storage, in the disk, in persistence.
This feature isn't provided out of the box in structured streaming. In normal processing, you would use dataset.write.saveAsTable(table_name)
, and that method isn't available.
After processing and saving the data in HDFS, you can manually update the partitions (or using a script that does this on a schedule):
If you use Hive
MSCK REPAIR TABLE table_name
If you use Impala
ALTER TABLE table_name RECOVER PARTITIONS
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With