How to create InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)?

How to use KafkaUtils.createDirectStream with the offsets for a particular Topic in Pyspark?

Phineas Dashevsky Avatar asked Oct 21 '15 20:10

1 Answers

If you want to create an RDD from records in a Kafka topic, use a static set of tuples.

Make available all the imports

from pyspark.streaming.kafka import KafkaUtils, OffsetRange

Then you create a dictionary of Kafka Brokers

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}

Then you create your offsets object

start = 0
until = 10
partition = 0
topic = 'topic'    
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]

Finally you create the RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)

To create Stream with offsets you need to do the following:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

Then you create your sparkstreaming context using your sparkcontext

ssc = StreamingContext(sc, 1)

Next we set up all of our parameters

 kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
 start = 0
 partition = 0
 topic = 'topic'    

Then we create our fromOffset Dictionary

topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long 

Finally we create the Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
Phineas Dashevsky Avatar answered Oct 04 '22 09:10

