I need to write a storm spout for reading data from a port. Wanted to know if that was logically possible.
With that in mind, I had designed a simple topology designed for the same with one spout and one bolt. The spout would gather HTTP requests sent using wget and the bolt would display the request-Just that.
My spout structure is as follows:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
SpoutOutputCollector sc;
//The socket
Socket clientSocket;
//The server socket
ServerSocket sc;
public ProxySpout(int port){
this.sc=new ServerSocket(port);
try{
clientSocket=sc.accept();
}catch(IOException ex){
//Handle it
}
}
public void nextTuple(){
try{
InputStream ic=clientSocket.getInputStream();
byte b=new byte[8196];
int len=ic.read(b);
sc.emit(new Values(b));
ic.close();
}catch(//){
//Handle it
}finally{
clientSocket.close();
}
}
}
I have implemented the rest of the methods too.
When I turn this into a topology and run it, I get an error when I send the first request:
java.lang.RuntimeException:java.io.NotSerializableException:java.net.Socket
Just need to know if there is something wrong with the way I am implementing this spout. Is it even possible for a spout to collect data from a port? Or for a spout to act as an instance of a proxy?
Edit
Got it working.
The code is:
public class ProxySpout extends BaseRichSpout{
//The O/P collector
static SpoutOutputCollector _collector;
//The socket
static Socket _clientSocket;
static ServerSocket _serverSocket;
static int _port;
public ProxySpout(int port){
_port=port;
}
public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){
_collector=collector;
_serverSocket=new ServerSocket(_port);
}
public void nextTuple(){
_clientSocket=_serverSocket.accept();
InputStream incomingIS=_clientSocket.getInputStream();
byte[] b=new byte[8196];
int len=b.incomingIS.read(b);
_collector.emit(new Values(b));
}
}
As per @Shaw's suggestion, tried initializing _serverSocket
in the open()
method and the _clientSocket
runs in nextTuple()
method for listening to requests.
Dunno the performance metrices of this one, but it works..:-)
There are just three abstractions in Apache Storm: spouts, bolts, and topologies. A spout is a source of streams in a computation. Typically a spout reads from a queueing broker such as Kestrel, RabbitMQ, or Kafka, but a spout can also generate its own stream or read from somewhere like the Twitter streaming API.
nextTuple − Emits the generated data through the collector. close − This method is called when a spout is going to shutdown.
Apache Storm – released by Twitter, is a distributed open-source framework that helps in the real-time processing of data. Apache Storm works for real-time data just as Hadoop works for batch processing of data (Batch processing is the opposite of real-time.
close − This method is called when a spout is going to shutdown. declareOutputFields − Declares the output schema of the tuple. fail − Specifies that a specific tuple is not processed and not to be reprocessed.
In constructor just assign the variables. Try to instantiate ServerSocket in prepare method, do not write any new ... in constructor. And rename variables, you have two sc variables.
public class ProxySpout extends BaseRichSpout{
int port;
public ProxySpout(int port){
this.port=port;
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//new ServerSocket
}
@Override
public void nextTuple() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
If you put it in prepare method then it will only be called once the spout is already deployed, so it doesn't need to be serialized, and it will only be called once per lifetime of the spout, so it's not inefficient.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With