Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SparkStreaming, RabbitMQ and MQTT in python using pika

Just to make things tricky, I'd like to consume messages from the rabbitMQ queue. Now I know there is a plugin for MQTT on rabbit (https://www.rabbitmq.com/mqtt.html).

However I cannot seem to make an example work where Spark consumes a message that has been produced from pika.

For example I am using the simple wordcount.py program here (https://spark.apache.org/docs/1.2.0/streaming-programming-guide.html) to see if I can I see a message producer in the following way:

import sys
import pika
import json
import future
import pprofile

def sendJson(json):

  connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
  channel = connection.channel()

  channel.queue_declare(queue='analytics', durable=True)
  channel.queue_bind(exchange='analytics_exchange',
                       queue='analytics')

  channel.basic_publish(exchange='analytics_exchange', routing_key='analytics',body=json)
  connection.close()

if __name__ == "__main__":
  with open(sys.argv[1],'r') as json_file:
    sendJson(json_file.read())

The sparkstreaming consumer is the following:

import sys
import operator

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils

sc = SparkContext(appName="SS")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint")
#ssc.setLogLevel("ERROR")


#RabbitMQ

"""EXCHANGE = 'analytics_exchange'
EXCHANGE_TYPE = 'direct'
QUEUE = 'analytics'
ROUTING_KEY = 'analytics'
RESPONSE_ROUTING_KEY = 'analytics-response'
"""


brokerUrl = "localhost:5672" # "tcp://iot.eclipse.org:1883"
topic = "analytics"

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
#dummy functions - nothing interesting...
words = mqttStream.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)

wordCounts.pprint()
ssc.start()
ssc.awaitTermination()

However unlike the simple wordcount example, I cannot get this to work and get the following error:

16/06/16 17:41:35 ERROR Executor: Exception in task 0.0 in stage 7.0 (TID 8)
java.lang.NullPointerException
    at org.eclipse.paho.client.mqttv3.MqttConnectOptions.validateURI(MqttConnectOptions.java:457)
    at org.eclipse.paho.client.mqttv3.MqttAsyncClient.<init>(MqttAsyncClient.java:273)

So my questions are, what should be the settings in terms of MQTTUtils.createStream(ssc, brokerUrl, topic) to listen into the queue and whether there are any more fuller examples and how these map onto those of rabbitMQ.

I am running my consumer code with: ./bin/spark-submit ../../bb/code/skunkworks/sparkMQTTRabbit.py

I have updated the producer code as follows with TCP parameters as suggested by one comment:

url_location = 'tcp://localhost'
url = os.environ.get('', url_location)
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)

and the spark streaming as:

brokerUrl = "tcp://127.0.0.1:5672"
topic = "#" #all messages

mqttStream = MQTTUtils.createStream(ssc, brokerUrl, topic)
records = mqttStream.flatMap(lambda line: json.loads(line))
count = records.map(lambda rec: len(rec))
total = count.reduce(lambda a, b: a + b)
total.pprint()
like image 688
disruptive Avatar asked Jun 16 '16 15:06

disruptive


2 Answers

It looks like you are using wrong port number. Assuming that:

  • you have a local instance of RabbitMQ running with default settings and you've enabled MQTT plugin (rabbitmq-plugins enable rabbitmq_mqtt) and restarted RabbitMQ server
  • included spark-streaming-mqtt when executing spark-submit / pyspark (either with packages or jars / driver-class-path)

you can connect using TCP with tcp://localhost:1883. You have to also remember that MQTT is using amq.topic.

Quick start:

  • create Dockerfile with following content:

    FROM rabbitmq:3-management
    
    RUN rabbitmq-plugins enable rabbitmq_mqtt
    
  • build Docker image:

    docker build -t rabbit_mqtt .
    
  • start image and wait until server is ready:

    docker run -p 15672:15672 -p 5672:5672 -p 1883:1883 rabbit_mqtt 
    
  • create producer.py with following content:

    import pika
    import time 
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    channel = connection.channel()
    channel.exchange_declare(exchange='amq.topic',
                     type='topic', durable=True)
    
    for i in range(1000):
        channel.basic_publish(
            exchange='amq.topic',  # amq.topic as exchange
            routing_key='hello',   # Routing key used by producer
            body='Hello World {0}'.format(i)
        )
        time.sleep(3)
    
    connection.close()
    
  • start producer

    python producer.py
    

    and visit management console http://127.0.0.1:15672/#/exchanges/%2F/amq.topic

    to see that messages are received.

  • create consumer.py with following content:

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.mqtt import MQTTUtils
    
    sc = SparkContext()
    ssc = StreamingContext(sc, 10)
    
    mqttStream = MQTTUtils.createStream(
        ssc, 
        "tcp://localhost:1883",  # Note both port number and protocol
        "hello"                  # The same routing key as used by producer
    )
    mqttStream.count().pprint()
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    
  • download dependencies (adjust Scala version to the one used to build Spark and Spark version):

    mvn dependency:get -Dartifact=org.apache.spark:spark-streaming-mqtt_2.11:1.6.1
    
  • make sure SPARK_HOME and PYTHONPATH point to the correct directories.

  • submit consumer.py with (adjust versions as before):

    spark-submit --packages org.apache.spark:spark-streaming-mqtt_2.11:1.6.1 consumer.py
    

If you followed all the steps you should see Hello world messages in the Spark log.

like image 56
zero323 Avatar answered Sep 22 '22 16:09

zero323


From the MqttAsyncClient Javadoc, the server URI must have one of the following schemes: tcp://, ssl://, or local://. You need to change your brokerUrl above to have one of these schemes.

For more information, here's a link to the source for MqttAsyncClient:

https://github.com/eclipse/paho.mqtt.java/blob/master/org.eclipse.paho.client.mqttv3/src/main/java/org/eclipse/paho/client/mqttv3/MqttAsyncClient.java#L272

like image 24
ck1 Avatar answered Sep 20 '22 16:09

ck1