I have a directory (Final Dir) in HDFS in which some files(ex :10 mb) are loading every minute. After some time i want to combine all the small files to a large file(ex :100 mb). But the user is continuously pushing files to Final Dir. it is a continuous process.
So for the first time i need to combine the first 10 files to a large file (ex : large.txt) and save file to Finaldir.
Now my question is how i will get the next 10 files excluding the first 10 files?
can some please help me
Here is one more alternate, this is still the legacy approach pointed out by @Andrew in his comments but with extra steps of making your input folder as a buffer to receive small files pushing them to a tmp directory in a timely fashion and merging them and pushing the result back to input.
step 1 : create a tmp directory
hadoop fs -mkdir tmp
step 2 : move all the small files to the tmp directory at a point of time
hadoop fs -mv input/*.txt tmp
step 3 -merge the small files with the help of hadoop-streaming jar
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-Dmapred.reduce.tasks=1 \
-input "/user/abc/input" \
-output "/user/abc/output" \
-mapper cat \
-reducer cat
step 4- move the output to the input folder
hadoop fs -mv output/part-00000 input/large_file.txt
step 5 - remove output
hadoop fs -rm -R output/
step 6 - remove all the files from tmp
hadoop fs -rm tmp/*.txt
Create a shell script from step 2 till step 6 and schedule it to run at regular intervals to merge the smaller files at regular intervals (may be for every minute based on your need)
Steps to schedule a cron job for merging small files
step 1: create a shell script /home/abc/mergejob.sh with the help of above steps (2 to 6)
important note: you need to specify the absolute path of hadoop in the script to be understood by cron
#!/bin/bash
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
wait
/home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
-Dmapred.reduce.tasks=1 \
-input "/user/abc/input" \
-output "/user/abc/output" \
-mapper cat \
-reducer cat
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
wait
/home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt
step 2: schedule the script using cron to run every minute using cron expression
a) edit crontab by choosing an editor
>crontab -e
b) add the following line at the end and exit from the editor
* * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1
The merge job will be scheduled to run for every minute.
Hope this was helpful.
@Andrew pointed you to a solution that was appropriate 6 years ago, in a batch-oriented world.
But it's 2016, you have a micro-batch data flow running and require a non-blocking solution.
That's how I would do it:
new_data
, reorg
and history
new_data
Now the batch compaction logic:
new_data
directory to reorg
reorg
files, into a new file in history
dir (feel free to GZip it on the fly, Hive will recognize the .gz
extension)
reorg
So it's basically the old 2010 story, except that your existing data flow can continue dumping new files into new_data
while the compaction is safely running in separate directories. And in case the compaction job crashes, you can safely investigate / clean-up / resume the compaction without compromising the data flow.
INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;
With a couple of SET some.property = somevalue
before that query, you can define what compression codec will be applied on the result file(s), how many file(s) you want (or more precisely, how big you want the files to be - Hive will run the merge accordingly), etc.
Look into https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties under hive.merge.mapfiles
and hive.merge.mapredfiles
(or hive.merge.tezfiles
if you use TEZ) and hive.merge.smallfiles.avgsize
and then hive.exec.compress.output
and mapreduce.output.fileoutputformat.compress.codec
-- plus hive.hadoop.supports.splittable.combineinputformat
to reduce the number of Map containers since your input files are quite small.
[*] very old SF reference here :-)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With