Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Websphere MQ as a data source for Apache Spark Streaming

I was digging into the possibilities for Websphere MQ as a data source for spark-streaming becuase it is needed in one of our use case. I got to know that MQTT is the protocol that supports the communication from MQ data structures but since I am a newbie to spark streaming I need some working examples for the same. Did anyone try to connect the MQ with spark streaming. Please devise the best way for doing so.

like image 445
tom Avatar asked May 25 '15 08:05

tom


People also ask

What sources can the data in Spark Streaming come from?

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, and Kinesis, or by applying high-level operations on other DStreams.

Which can act as a data sink for Spark Streaming?

Spark Streaming engine processes incoming data from various input sources. Input sources generate data like Kafka, Flume, HDFS/S3/any file system, etc. Sinks store processed data from Spark Streaming engines like HDFS/File System, relational databases, or NoSQL DB's.

What is WebSphere MQ used for?

The main use of IBM WebSphere MQ is to send or exchange messages. One application puts a message on a queue on one computer, and another application gets the same message from another queue on a different computer.

What are some of the ways of processing streaming data in Apache spark?

Spark Streaming comes with several API methods that are useful for processing data streams. There are RDD-like operations like map, flatMap, filter, count, reduce, groupByKey, reduceByKey, sortByKey , and join. It also provides additional API to process the streaming data based on window and stateful operations.


1 Answers

So, I am posting here the working code for CustomMQReceiver which connects the Websphere MQ and reads data :

public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;

Enumeration enumeration =null;

public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
    super(StorageLevel.MEMORY_ONLY_2());
    this.host = host;
    this.port = port;
    this.qm=qm;
    this.qn=qn;
    this.channel=channel;

}

public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
        @Override public void run() {
            try {
                initConnection();
                receive();
            }
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
        }
    }.start();
}
public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
}

 /** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
  System.out.print("Started receiving messages from MQ");

    try {

    JMSMessage receivedMessage= null;

        while (!isStopped() && enumeration.hasMoreElements() )
        {

            receivedMessage= (JMSMessage) enumeration.nextElement();
            String userInput = convertStreamToString(receivedMessage);
            //System.out.println("Received data :'" + userInput + "'");
            store(userInput);
        }

        // Restart in an attempt to connect again when server is active again
        //restart("Trying to connect again");

        stop("No More Messages To read !");
        qCon.close();
        System.out.println("Queue Connection is Closed");

    }
    catch(Exception e)
    {
        e.printStackTrace();
        restart("Trying to connect again");
    }
    catch(Throwable t) {
        // restart if there is any other error
        restart("Error receiving data", t);
    }
    }

  public void initConnection() throws JMSException
{
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
    conFactory.setHostName(host);
    conFactory.setPort(port);
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
    conFactory.setQueueManager(qm);
    conFactory.setChannel(channel);


    qCon= (MQQueueConnection) conFactory.createQueueConnection();
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
    MQQueue queue=(MQQueue) qSession.createQueue(qn);
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
    qCon.start();

    enumeration= browser.getEnumeration();
   }

 @Override
public StorageLevel storageLevel() {
    return StorageLevel.MEMORY_ONLY_2();
}
}
like image 96
tom Avatar answered Oct 28 '22 14:10

tom