Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Delta/Incremental Load in Hive

I have the use case below :

My application has a table having multiyear data in RDBMS DB. We have used sqoop to get data into HDFS and have loaded into hive table partitioned by year, month.

Now, the application updates, and inserts new records into RDBMS Table table daily as well. These updated records can span across history months. Updated records and new insert records can be determined by updated timestamp field (it will have current day timestamp).

Now the problem here is : how to do delta/incremental load hive table daily using these updated records.

-> I know there is a sqoop functionality which allows incremental imports. But, only new incremental import is not enough for us.

Because -

-> I can not directly insert these records (using insert into) in hive table because it will result in duplicate records (updated records).

-> Same way I can not use insert overwrite statement as these are just update and insert records spanning across multiple month. Insert overwrite will delete earlier records.

Of course, easiest option is to get full data using sqoop daily but we don't want to do it as data volume is large.

So , basically we want to fully load only those partitions for which we have received update/insert records.

We are open to explore option at hive or sqoop end. Can you please let us know?

Thanks in advance.

like image 343
jigarshah Avatar asked Jun 12 '14 15:06

jigarshah


People also ask

What is incremental load in hive?

Incremental load is commonly used to implement slowly changing dimensions. When you migrate your data to the Hadoop Hive, you might usually keep the slowly changing tables to sync up tables with the latest data.

Is Delta load same as incremental load?

Delat Load and Incremental load is same, List of Load Types in Qlikview : 1. Loading data from the file.

How do I load incremental data in Hive using Sqoop?

We can use Sqoop incremental import command with “-merge-key” option for updating the records in an already imported Hive table. --incremental lastmodified will import the updated and new records from RDBMS (MySQL) database based on last latest value of emp_timestamp in Hive.

What is HiveQL in big data?

Hive allows users to read, write, and manage petabytes of data using SQL. Hive is built on top of Apache Hadoop, which is an open-source framework used to efficiently store and process large datasets. As a result, Hive is closely integrated with Hadoop, and is designed to work quickly on petabytes of data.


2 Answers

Updates are a notoriously difficult problem for any Hive-based system.

One typical approach is a two-step process

  1. Insert any data that has changed into one table. As you said, this will result in duplicates when rows are updated.
  2. Periodically overwrite a second table with "de-duplicated" data from the first table.

The second step is potentially painful, but there's really no way around it. At some level, you have to be overwriting, since Hive doesn't do in-place updating. Depending on your data, you may be able to partition the tables cleverly enough to avoid doing full overwrites, though. For example, if step 1 only inserts into a handful of partitions, then only those partitions need to be overwritten into the second table.

Also, depending on the access pattern, it can make sense to just have the second "de-duplicated" table be a view and not materialize it at all. Usually this just delays the pain to query time, though.

The only other way round this I've seen is using a very custom input and output format. Rather than explain it all, you can read about it here: http://pkghosh.wordpress.com/2012/07/08/making-hive-squawk-like-a-real-database/

Owen O'Malley has also been working on adding a version of this idea to standard Hive, but that's still in development: https://issues.apache.org/jira/browse/HIVE-5317

like image 117
Joe K Avatar answered Oct 20 '22 06:10

Joe K


You can use a direct Map Reduce approach for bulk inset, update and delete. Details are here. It's essentially a merge and compact operation. A secondary sorting is performed on time stamp or sequence field either in the record or encoded in HDFS file names. The last version of a record from the reducer side join is emitted as output.

https://pkghosh.wordpress.com/2015/04/26/bulk-insert-update-and-delete-in-hadoop-data-lake/

like image 35
Pranab Avatar answered Oct 20 '22 04:10

Pranab