Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Get topic from kafka message

How can I identify the topic name from a message in kafka.

String[] topics = { "test", "test1", "test2" };
    for (String t : topics) {
        topicMap.put(t, new Integer(3));
    }

SparkConf conf = new SparkConf().setAppName("KafkaReceiver")
            .set("spark.streaming.receiver.writeAheadLog.enable", "false")
            .setMaster("local[4]")
            .set("spark.cassandra.connection.host", "localhost");
    ;
    final JavaSparkContext sc = new JavaSparkContext(conf);
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
            1000));

    /* Receive Kafka streaming inputs */
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
            .createStream(jssc, "localhost:2181", "test-group",
                    topicMap);

   JavaDStream<MessageAndMetadata> data = 
          messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
          {

              public MessageAndMetadata call(Tuple2<String, String> message)
              {
                  System.out.println("message ="+message._2);
                  return null;
              }
          }

          );

I can fetch message from kafka producer. But since the consumer now consuming from three topic, it is needed to identify topic name.

like image 294
Arun M R Nair Avatar asked Nov 10 '22 13:11

Arun M R Nair


1 Answers

As of Spark 1.5.0, official documentation encourages using no-receiver/direct approach starting from recent releases, which has graduated from experimental in recent 1.5.0. This new Direct API allows you to easily obtain message and its metadata apart from other good things.

like image 162
Andrii Rubtsov Avatar answered Nov 15 '22 06:11

Andrii Rubtsov