How can I identify the topic name from a message in kafka.
String[] topics = { "test", "test1", "test2" };
for (String t : topics) {
topicMap.put(t, new Integer(3));
}
SparkConf conf = new SparkConf().setAppName("KafkaReceiver")
.set("spark.streaming.receiver.writeAheadLog.enable", "false")
.setMaster("local[4]")
.set("spark.cassandra.connection.host", "localhost");
;
final JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
1000));
/* Receive Kafka streaming inputs */
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
.createStream(jssc, "localhost:2181", "test-group",
topicMap);
JavaDStream<MessageAndMetadata> data =
messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>()
{
public MessageAndMetadata call(Tuple2<String, String> message)
{
System.out.println("message ="+message._2);
return null;
}
}
);
I can fetch message from kafka producer. But since the consumer now consuming from three topic, it is needed to identify topic name.
As of Spark 1.5.0, official documentation encourages using no-receiver/direct approach starting from recent releases, which has graduated from experimental in recent 1.5.0. This new Direct API allows you to easily obtain message and its metadata apart from other good things.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With