Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Storm : Spout for reading data from a port

I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.

With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.

My spout structure is as follows:

public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         SpoutOutputCollector sc;
         //The socket
         Socket clientSocket;
         //The server socket
         ServerSocket sc;

         public ProxySpout(int port){
            this.sc=new ServerSocket(port);
            try{
                clientSocket=sc.accept();
            }catch(IOException ex){
                //Handle it
            }
         }

         public void nextTuple(){
            try{
                InputStream ic=clientSocket.getInputStream();
                byte b=new byte[8196];
                int len=ic.read(b);

                sc.emit(new Values(b));
                ic.close();
            }catch(//){
                //Handle it
            }finally{
                clientSocket.close();
            }
         }
}

I have implemented the rest of the methods too.

When I turn this into a topology and run it, I get an error when I send the first request:

java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket

Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?

Edit

Got it working.

The code is:

   public class ProxySpout extends BaseRichSpout{
         //The O/P collector
         static SpoutOutputCollector _collector;
         //The socket
         static Socket _clientSocket;
         static ServerSocket _serverSocket;
         static int _port;

         public ProxySpout(int port){
          _port=port;
         }

         public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
           _collector=collector;
           _serverSocket=new ServerSocket(_port);
         }   

         public void nextTuple(){
            _clientSocket=_serverSocket.accept();
            InputStream incomingIS=_clientSocket.getInputStream();
            byte[] b=new byte[8196];
            int len=b.incomingIS.read(b);
            _collector.emit(new Values(b));
     }
}

As per @Shaw's suggestion, tried initializing _serverSocket in the open() method and the _clientSocket runs in nextTuple() method for listening to requests.

Dunno the performance metrices of this one, but it works..:-)

like image 849
Mkl Rjv Avatar asked May 13 '14 16:05

Mkl Rjv


People also ask

What is Storm spout?

There are just three abstractions in Apache Storm: spouts, bolts, and topologies. A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere like the Twitter streaming API.

Which of the following method of Storm topology is used to emit the generated data through the collector?

nextTuple − Emits the generated data through the collector. close − This method is called when a spout is going to shutdown.

How does Apache storm work?

Apache Storm – released by Twitter, is a distributed open-source framework that helps in the real-time processing of data. Apache Storm works for real-time data just as Hadoop works for batch processing of data (Batch processing is the opposite of real-time.

Which of the following method of Storm topology is called when a spout is going to shutdown?

close − This method is called when a spout is going to shutdown. declareOutputFields − Declares the output schema of the tuple. fail − Specifies that a specific tuple is not processed and not to be reprocessed.


Video Answer


1 Answers

In constructor just assign the variables. Try to instantiate ServerSocket in prepare method, do not write any new ... in constructor. And rename variables, you have two sc variables.

public class ProxySpout extends BaseRichSpout{

    int port;

    public ProxySpout(int port){
        this.port=port;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)  { 
        //new ServerSocket
    }

    @Override
    public void nextTuple() {

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

If you put it in prepare method then it will only be called once the spout is already deployed, so it doesn't need to be serialized, and it will only be called once per lifetime of the spout, so it's not inefficient.

like image 163
gasparms Avatar answered Oct 17 '22 16:10

gasparms