Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

kafka connect multiple topics in sink connector properties

I am trying to read 2 kafka topics using Cassandra sink connector and insert into 2 Cassandra tables. How can I go about doing this?

This is my connector.properties file:

name=cassandra-sink-orders
connector.class=com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
tasks.max=1
topics=topic1,topic2
connect.cassandra.kcql=INSERT INTO ks.table1 SELECT * FROM topic1;INSERT INTO ks.table2 SELECT * FROM topic2 
connect.cassandra.contact.points=localhost
connect.cassandra.port=9042
connect.cassandra.key.space=ks
connect.cassandra.contact.points=localhost
connect.cassandra.username=cassandra
connect.cassandra.password=cassandra

Am I doing everything right? Is this the best way of doing this or should I create two separate connectors?

like image 632
el323 Avatar asked Feb 23 '18 06:02

el323


People also ask

How does Kafka sink connector work?

A sink connector delivers data from Kafka topics into other systems, which might be indexes such as Elasticsearch, batch systems such as Hadoop, or any kind of database. Some connectors are maintained by the community, while others are supported by Confluent or its partners.

How many modes exist in Kafka connect?

There are two modes for running workers: Standalone mode: Useful for development and testing Kafka Connect on a local machine. It can also be used for environments that typically use single agents (for example, sending web server logs to Kafka).

What are connectors in Kafka connect?

The Kafka Connect JMS Source connector is used to move messages from any JMS-compliant broker into Apache Kafka®. The Kafka Connect Elasticsearch Service Sink connector moves data from Apache Kafka® to Elasticsearch. It writes data from a topic in Kafka to an index in Elasticsearch.

What is Kafka http sink connector?

The Kafka Connect HTTP Sink connector integrates Apache Kafka® with an API via HTTP or HTTPS. The connector consumes records from Kafka topic(s) and converts each record value to a String or a JSON with request. body. format=json before sending it in the request body to the configured http.


2 Answers

There's one issue with your config. You need one task per topic-partition. So if your topics have one partition, you need tasks.max set to at least 2.

I don't see it documented in Connect's docs, which is a shame

like image 71
Yevgeny Khodorkovsky Avatar answered Oct 30 '22 22:10

Yevgeny Khodorkovsky


If you want to consume those two topics in one consumer that's fine and it's correct setup. The best way of doing this depends whether those messages should be consumed by one or two consumers. So it depends on your business logic.

Anyway, if you want to consume two topics via one consumer that should work find since consumer can subscribe to multiple topics. Did you try running this consumer? Is it working?

like image 33
ctomek Avatar answered Oct 30 '22 23:10

ctomek