Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to efficiently move data from Kafka to an Impala table?

Here are the steps to the current process:

  1. Flafka writes logs to a 'landing zone' on HDFS.
  2. A job, scheduled by Oozie, copies complete files from the landing zone to a staging area.
  3. The staging data is 'schema-ified' by a Hive table that uses the staging area as its location.
  4. Records from the staging table are added to a permanent Hive table (e.g. insert into permanent_table select * from staging_table).
  5. The data, from the Hive table, is available in Impala by executing refresh permanent_table in Impala.

existing data flow

I look at the process I've built and it "smells" bad: there are too many intermediate steps that impair the flow of data.

About 20 months ago, I saw a demo where data was being streamed from an Amazon Kinesis pipe and was queryable, in near real-time, by Impala. I don't suppose they did something quite so ugly/convoluted. Is there a more efficient way to stream data from Kafka to Impala (possibly a Kafka consumer that can serialize to Parquet)?

I imagine that "streaming data to low-latency SQL" must be a fairly common use case, and so I'm interested to know how other people have solved this problem.

like image 899
Alex Woolford Avatar asked Jan 25 '16 23:01

Alex Woolford


1 Answers

If you need to dump your Kafka data as-is to HDFS the best option is using Kafka Connect and Confluent HDFS connector.

You can either dump the data to a parket file on HDFS you can load in Impala. You'll need I think you'll want to use a TimeBasedPartitioner partitioner to make parquet files every X miliseconds (tuning the partition.duration.ms configuration parameter).

Addign something like this to your Kafka Connect configuration might do the trick:

# Don't flush less than 1000 messages to HDFS
flush.size = 1000 

# Dump to parquet files   

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

partitioner.class = TimebasedPartitioner

# One file every hour. If you change this, remember to change the filename format to reflect this change
partition.duration.ms = 3600000
# Filename format
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/'minute'=mm
like image 55
Iñigo González Avatar answered Oct 24 '22 04:10

Iñigo González