I'm new to Scala so the question may be quite simple, though I have spent some time trying to resolve it. I have a simple Scala TCP server (no actors, single thread):
import java.io._
import java.net._
object Application {
def readSocket(socket: Socket): String = {
val bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream))
var request = ""
var line = ""
do {
line = bufferedReader.readLine()
if (line == null) {
println("Stream terminated")
return request
}
request += line + "\n"
} while (line != "")
request
}
def writeSocket(socket: Socket, string: String) {
val out: PrintWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream))
out.println(string)
out.flush()
}
def main(args: Array[String]) {
val port = 8000
val serverSocket = new ServerSocket(port)
while (true) {
val socket = serverSocket.accept()
readSocket(socket)
writeSocket(socket, "HTTP/1.1 200 OK\r\n\r\nOK")
socket.close()
}
}
}
The server listens on localhost:8000
for incomming requests and sends HTTP response with single OK
word in the body. Then I run Apache Benchmark like this:
ab -c 1000 -n 10000 http://localhost:8000/
which works nicely for the first time. The second time I start ab
it hangs producing the following output in netstat -a | grep 8000
:
....
tcp 0 0 localhost.localdo:43709 localhost.localdom:8000 FIN_WAIT2
tcp 0 0 localhost.localdo:43711 localhost.localdom:8000 FIN_WAIT2
tcp 0 0 localhost.localdo:43717 localhost.localdom:8000 FIN_WAIT2
tcp 0 0 localhost.localdo:43777 localhost.localdom:8000 FIN_WAIT2
tcp 0 0 localhost.localdo:43722 localhost.localdom:8000 FIN_WAIT2
tcp 0 0 localhost.localdo:43725 localhost.localdom:8000 FIN_WAIT2
tcp6 0 0 [::]:8000 [::]:* LISTEN
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43724 CLOSE_WAIT
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43786 CLOSE_WAIT
tcp6 1 0 localhost.localdom:8000 localhost.localdo:43679 CLOSE_WAIT
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43735 CLOSE_WAIT
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43757 CLOSE_WAIT
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43754 CLOSE_WAIT
tcp6 83 0 localhost.localdom:8000 localhost.localdo:43723 CLOSE_WAIT
....
Since that no more requests are served by the server. One more detail: The same ab
script with the same parameters works smoothly testing a simple Node.js server on the same machine. So this issue is not related to a number of opened TCP connections which I have set to be reusable with
sudo sysctl -w net.ipv4.tcp_tw_recycle=1
sudo sysctl -w net.ipv4.tcp_tw_reuse=1
Could anyone give me a clue on what I'm missing?
Edit: Termination of stream handling has been added to the code above:
if (line == null) {
println("Stream terminated")
return request
}
I'm posting the (partial) answer to my own question for those who will stumble upon the same issue one day. First, the nature of the problem lies not in the source code but in the system itself which restricts numerious connections. The problem is that the socket
passed to readSocket
function appears corrupted under some conditions, i.e. it can not be read and bufferedReader.readLine()
either returns null
on first call or hangs indefinitely. The following two steps make the code working on some machines:
Increase the number of concurrent connections to a socket with
sysctl -w net.core.somaxconn=65535
Provide the second parameter to ServerSocket
constructor which will explicitly set the length of connection queue:
val maxQueue = 50000
val serverSocket = new ServerSocket(port, maxQueue)
The steps above solve the problem on EC2 m1.large instances, however I'm still getting issues on my local machine. The better way would be to use Akka for the stuff of that kind:
import akka.actor._
import java.net.InetSocketAddress
import akka.util.ByteString
class TCPServer(port: Int) extends Actor {
override def preStart {
IOManager(context.system).listen(new InetSocketAddress(port))
}
def receive = {
case IO.NewClient(server) =>
server.accept()
case IO.Read(rHandle, bytes) => {
val byteString = ByteString("HTTP/1.1 200 OK\r\n\r\nOK")
rHandle.asSocket.write(byteString)
rHandle.close()
}
}
}
object Application {
def main(args: Array[String]) {
val port = 8000
ActorSystem().actorOf(Props(new TCPServer(port)))
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With