I have a producer (using Kafka), and more than one consumer. So I publish a message in a topic and then my consumers receive and process the message.
I need to receive a response in the producer from at least one consumer (better if it be the first). I'm trying to use RxJava to do it (observables).
Is it possible to do in that way? Anyone have an example?
Here is how I am using rxjava '2.2.6' without any additional dependencies to process Kafka events:
import io.reactivex.Observable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
...
// Load consumer props
Properties props = new Properties();
props.load(KafkaUtils.class.getClassLoader().getResourceAsStream("kafka-client.properties"));
// Create a consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// Subscribe to topics
consumer.subscribe(Arrays.asList(props.getProperty("kafkaTopics").split("\\s*,\\s*")));
// Create an Observable for topic events
Observable<ConsumerRecords<String, String>> observable = Observable.fromCallable(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(10);
return records;
});
// Process Observable events
observable.subscribe(records -> {
if ((records != null) && (!records.isEmpty())) {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
}
}
});
You can use it as follows:
val consumer = new RxConsumer("zookeeper:2181", "consumer-group")
consumer.getRecordStream("cool-topic-(x|y|z)")
.map(deserialize)
.take(42 seconds)
.foreach(println)
consumer.shutdown()
For more information see: https://github.com/cjdev/kafka-rx
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With