Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

[HDFS connector + Kafka]How to write multiple topics in standalone mode?

I am using Confluent's HDFS Connector to write streamed data to HDFS. I followed the user manual and quick start and setup my Connector. It works properly when i consume only one topic. My property file looks like this

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_topic1
hdfs.url=hdfs://localhost:9000
flush.size=30

When i add more than one topic, i see it continuously committing offsets and i do not see it writing the committed messages.

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=2
topics=test_topic1,test_topic2
hdfs.url=hdfs://localhost:9000
flush.size=30

I tried with tasks.max with 1 and 2. I continuously get Committing offsets logged as below

[2016-10-26 15:21:30,990] INFO Started recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,222] INFO Finished recovery for topic partition test_topic1-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:31,230] INFO Started recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:193)
[2016-10-26 15:21:31,236] INFO Finished recovery for topic partition test_topic2-0 (io.confluent.connect.hdfs.TopicPartitionWriter:208)
[2016-10-26 15:21:35,155] INFO Reflections took 6962 ms to scan 249 urls, producing 11712 keys and 77746 values  (org.reflections.Reflections:229)
[2016-10-26 15:22:29,226] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:23:29,227] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:24:29,225] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)
[2016-10-26 15:25:29,224] INFO WorkerSinkTask{id=hdfs-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:261)

When i gracefully stop the service (Ctrl+C), i see it removing the tmp files. What am i doing wrong? What is the proper way to do it? Appreciate any suggestions on this.

like image 749
NehaM Avatar asked Jan 28 '26 13:01

NehaM


1 Answers

I've kept stumbling over the same problem you've mentioned here for the past month or so and I couldn't get to the bottom of it, until today when I've upgraded to confluent 3.1.1 and stuff started working as expected...

This is how I roll

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=5
topics=accounts,contacts,users
hdfs.url=hdfs://localhost:9000
flush.size=1
hive.metastore.uris=thrift://localhost:9083
hive.integration=true
schema.compatibility=BACKWARD
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
like image 184
Filip Avatar answered Jan 30 '26 11:01

Filip



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!