Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Kafka through Observable(RxJava)

I have a producer (using Kafka), and more than one consumer. So I publish a message in a topic and then my consumers receive and process the message.

I need to receive a response in the producer from at least one consumer (better if it be the first). I'm trying to use RxJava to do it (observables).

Is it possible to do in that way? Anyone have an example?

like image 570
André Rodrigo dos Santos Avatar asked Dec 24 '22 18:12

André Rodrigo dos Santos


2 Answers

Here is how I am using rxjava '2.2.6' without any additional dependencies to process Kafka events:

import io.reactivex.Observable;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

...

// Load consumer props 
Properties props = new Properties();  
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties")); 

// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\\s*,\\s*")));

// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
    return records;
});

// Process Observable events
observable.subscribe(records -> {
    if ((records != null) && (!records.isEmpty())) {
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.offset() + ": " + record.value());
        }
    }
});

like image 149
Farrukh Najmi Avatar answered Feb 07 '23 01:02

Farrukh Najmi


You can use it as follows:

val consumer = new RxConsumer("zookeeper:2181", "consumer-group")

consumer.getRecordStream("cool-topic-(x|y|z)")
  .map(deserialize)
  .take(42 seconds)
  .foreach(println)

  consumer.shutdown()

For more information see: https://github.com/cjdev/kafka-rx

like image 30
Tiago Costa Avatar answered Feb 07 '23 00:02

Tiago Costa