I have an Apache Spark cluster and a RabbitMQ broker and I want to consume messages and compute some metrics using the pyspark.streaming
module.
The problem is I only found this package, but is implemented in Java and Scala. Besides that, I didn't find any example or bridge implementation in Python.
I have a consumer implemented using Pika but I don't know how to pass the payload to my StreamingContext
.
This solution uses pika asynchronous consumer example and socketTextStream
method from Spark Streaming
.py
fileConsumer
classUnder if __name__ == '__main__':
we need to open a socket with the HOST
and PORT
corresponding to your TCP connection to Spark Streaming. We must save the method sendall
from socket into a variable pass this to the Consumer
class
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
consumer.run()
except Exception as e:
consumer.stop()
s.close()
Modify the __init__
method in Consumer to pass the dispatcher
def __init__(self,dispatcher):
self._connection = None
self._channel = None
self._closing = False
self._consumer_tag = None
self._url = amqp_url
#new code
self._dispatcher = dispatcher
In the method on_message
inside the Consumer we call self._dispatcher
to send the body
of the AMQP message
def on_message(self, unused_channel, basic_deliver, properties, body):
self._channel.basic_ack(basic_deliver.delivery_tag)
try:
# we need an '\n' at the each row Spark socketTextStream
self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
except Exception as e:
raise
In Spark, put ssc.socketTextStream(HOST, int(PORT))
with HOST
and PORT
corresponding to our TCP socket. Spark will manage the connection
Run first the consumer and then the Spark application
Final remarks:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With