I was digging into the possibilities for Websphere MQ as a data source for spark-streaming becuase it is needed in one of our use case. I got to know that MQTT is the protocol that supports the communication from MQ data structures but since I am a newbie to spark streaming I need some working examples for the same. Did anyone try to connect the MQ with spark streaming. Please devise the best way for doing so.
Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, and Kinesis, or by applying high-level operations on other DStreams.
Spark Streaming engine processes incoming data from various input sources. Input sources generate data like Kafka, Flume, HDFS/S3/any file system, etc. Sinks store processed data from Spark Streaming engines like HDFS/File System, relational databases, or NoSQL DB's.
The main use of IBM WebSphere MQ is to send or exchange messages. One application puts a message on a queue on one computer, and another application gets the same message from another queue on a different computer.
Spark Streaming comes with several API methods that are useful for processing data streams. There are RDD-like operations like map, flatMap, filter, count, reduce, groupByKey, reduceByKey, sortByKey , and join. It also provides additional API to process the streaming data based on window and stateful operations.
So, I am posting here the working code for CustomMQReceiver which connects the Websphere MQ and reads data :
public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;
Enumeration enumeration =null;
public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
super(StorageLevel.MEMORY_ONLY_2());
this.host = host;
this.port = port;
this.qm=qm;
this.qn=qn;
this.channel=channel;
}
public void onStart() {
// Start the thread that receives data over a connection
new Thread() {
@Override public void run() {
try {
initConnection();
receive();
}
catch (JMSException ex)
{
ex.printStackTrace();
}
}
}.start();
}
public void onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
System.out.print("Started receiving messages from MQ");
try {
JMSMessage receivedMessage= null;
while (!isStopped() && enumeration.hasMoreElements() )
{
receivedMessage= (JMSMessage) enumeration.nextElement();
String userInput = convertStreamToString(receivedMessage);
//System.out.println("Received data :'" + userInput + "'");
store(userInput);
}
// Restart in an attempt to connect again when server is active again
//restart("Trying to connect again");
stop("No More Messages To read !");
qCon.close();
System.out.println("Queue Connection is Closed");
}
catch(Exception e)
{
e.printStackTrace();
restart("Trying to connect again");
}
catch(Throwable t) {
// restart if there is any other error
restart("Error receiving data", t);
}
}
public void initConnection() throws JMSException
{
MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
conFactory.setHostName(host);
conFactory.setPort(port);
conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
conFactory.setQueueManager(qm);
conFactory.setChannel(channel);
qCon= (MQQueueConnection) conFactory.createQueueConnection();
MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
MQQueue queue=(MQQueue) qSession.createQueue(qn);
MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
qCon.start();
enumeration= browser.getEnumeration();
}
@Override
public StorageLevel storageLevel() {
return StorageLevel.MEMORY_ONLY_2();
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With