I added the dependency in spark's POM.xml, given in the following link:
http://bahir.apache.org/docs/spark/current/spark-sql-streaming-mqtt/
Built spark using maven again. But as we can see it shows only Java and Scala supports to get data from mqtt.
I want to get stream data from mqtt in python. In earlier versions we had a pyspark.streaming.mqtt for the same. What is the similar to this in spark 2.2.0 pyspark. I am using mosquitto for mqtt broker.
For PySpark you can use Structured Streaming bindings (you have to include Bahir jar):
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() # type: SparkSession
(spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.load("tcp://{}".format(broker_uri)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With