I have an Apache Spark cluster and a RabbitMQ broker and I want to consume messages and compute some metrics using the pyspark.streaming module.
The problem is I only found this package, but is implemented in Java and Scala. Besides that, I didn't find any example or bridge implementation in Python.
I have a consumer implemented using Pika but I don't know how to pass the payload to my StreamingContext.
This solution uses pika asynchronous consumer example and socketTextStream method from Spark Streaming
.py fileConsumer classUnder if __name__ == '__main__': we need to open a socket with the HOST and PORT corresponding to your TCP connection to Spark Streaming. We must save the method sendall from socket into a variable pass this to  the Consumer class
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
  s.bind((HOST, PORT))
  s.listen(1)
  conn, addr = s.accept()
  dispatcher = conn.sendall #assigning sendall to dispatcher variable
consumer = Consumer(dispatcher)
try:
  consumer.run()
except Exception as e:
  consumer.stop()
  s.close()
Modify the __init__ method in Consumer to pass the dispatcher
def __init__(self,dispatcher):
  self._connection = None
  self._channel = None
  self._closing = False
  self._consumer_tag = None
  self._url = amqp_url
  #new code
  self._dispatcher = dispatcher
In the method on_message inside the Consumer we call self._dispatcher to send the body of the AMQP message
def on_message(self, unused_channel, basic_deliver, properties, body):
  self._channel.basic_ack(basic_deliver.delivery_tag)
  try:
    # we need an '\n' at the each row Spark socketTextStream
    self._dispatcher(bytes(body.decode("utf-8")+'\n',"utf-8"))
  except Exception as e:
    raise
In Spark, put ssc.socketTextStream(HOST, int(PORT)) with HOST and PORT corresponding to our TCP socket. Spark will manage the connection
Run first the consumer and then the Spark application
Final remarks:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With