How to use KafkaUtils.createDirectStream
with the offsets for a particular Topic
in Pyspark?
You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
Since we already know that the data read from Kafka has “VALUE” attribute in it, we have to have a same-named column to write data back to Kafka Topic. In case the result consists of multiple columns, condense them to a JSON, cast as a string, write to a value column. Each column's data should be cast to String.
An important one is spark. streaming. kafka. maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. Deploying: This is same as the first approach.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
If you want to create an RDD from records in a Kafka topic, use a static set of tuples.
Make available all the imports
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
Then you create a dictionary of Kafka Brokers
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
Then you create your offsets object
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
Finally you create the RDD:
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
To create Stream with offsets you need to do the following:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
Then you create your sparkstreaming context using your sparkcontext
ssc = StreamingContext(sc, 1)
Next we set up all of our parameters
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
Then we create our fromOffset Dictionary
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
Finally we create the Stream
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With