following is already been achieved
while this is working fine so far. there is only one issue I am facing, while my app insert data into Hive table, it created small file with each row data per file.
below is the code
// Define which topics to read from
val topic = "topic_twitter"
val groupId = "group-1"
val consumer = KafkaConsumer(topic, groupId, "localhost:2181")
//Create SparkContext
val sparkContext = new SparkContext("local[2]", "KafkaConsumer")
//Create HiveContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sparkContext)
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS twitter_data (tweetId BIGINT, tweetText STRING, userName STRING, tweetTimeStamp STRING, userLang STRING)")
hiveContext.sql("CREATE EXTERNAL TABLE IF NOT EXISTS demo (foo STRING)")
Hive demo table already populated with one single record. Kafka consumer loop thru the data for topic ="topic_twitter" in process each row and populate in Hive table
val hiveSql = "INSERT INTO TABLE twitter_data SELECT STACK( 1," +
tweetID +"," +
tweetText +"," +
userName +"," +
tweetTimeStamp +"," +
userLang + ") FROM demo limit 1"
hiveContext.sql(hiveSql)
below are the images from my Hadoop environment. twitter_data, demo

last 10 files created in HDFS
as you can see the file size is not more than 200KB, is there a way I merge these files in one file?
[take 2] OK, so you can't properly "stream" data into Hive. But you can add a periodic compaction post-processing job...
(role='collectA'), (role='collectB'), (role='archive')(role='activeA')(role='activeB')then dump every record that you have collected in the "A" partition into "archive", hoping that Hive default config will do a good job of limiting fragmentation
INSERT INTO TABLE twitter_data PARTITION (role='archive')
SELECT ...
FROM twitter_data WHERE role='activeA'
;
TRUNCATE TABLE twitter_data PARTITION (role='activeA')
;
at some point, switch back to "A" etc.
One last word: if Hive still creates too many files on each compaction job, then try tweaking some parameters in your session, just before the INSERT e.g.
set hive.merge.mapfiles =true;
set hive.merge.mapredfiles =true;
set hive.merge.smallfiles.avgsize=1024000000;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With