Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create InputDStream with offsets in PySpark (using KafkaUtils.createDirectStream)?

How to use KafkaUtils.createDirectStream with the offsets for a particular Topic in Pyspark?

like image 791
Phineas Dashevsky Avatar asked Oct 21 '15 20:10

Phineas Dashevsky


People also ask

Which function is used to get current Kafka offsets for an RDD?

You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

How do you read data from Kafka topic using Pyspark?

Since we already know that the data read from Kafka has “VALUE” attribute in it, we have to have a same-named column to write data back to Kafka Topic. In case the result consists of multiple columns, condense them to a JSON, cast as a string, write to a value column. Each column's data should be cast to String.

What is spark streaming Kafka maxRatePerPartition?

An important one is spark. streaming. kafka. maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. Deploying: This is same as the first approach.

What is Kafka and spark?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.


1 Answers

If you want to create an RDD from records in a Kafka topic, use a static set of tuples.

Make available all the imports

from pyspark.streaming.kafka import KafkaUtils, OffsetRange

Then you create a dictionary of Kafka Brokers

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}

Then you create your offsets object

start = 0
until = 10
partition = 0
topic = 'topic'    
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]

Finally you create the RDD:

kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)

To create Stream with offsets you need to do the following:

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext

Then you create your sparkstreaming context using your sparkcontext

ssc = StreamingContext(sc, 1)

Next we set up all of our parameters

 kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
 start = 0
 partition = 0
 topic = 'topic'    

Then we create our fromOffset Dictionary

topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long 

Finally we create the Stream

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset)
like image 110
Phineas Dashevsky Avatar answered Oct 04 '22 09:10

Phineas Dashevsky