I am currently using this lib to stress test a kafka server that I have set up: https://github.com/dsully/pykafka
import kafka
import time
def test_kafka_server(n=1):
for i in range(0,n):
producer = kafka.producer.Producer('test',host='10.137.8.192')
message = kafka.message.Message(str(time.time()))
producer.send(message)
producer.disconnect()
def main():
test_kafka_server(100000)
if __name__ == '__main__':
main()
What just ends up happening is that I end up overloading my own local machine.
I get error 10055, which according to google means that "Windows has run out of TCP/IP socket buffers because too many connections are open at once." According to netstat, producer.disconnect() is not closing the socket, but rather putting it in a TIME_WAIT
state.
The ipython debugger points to this line:
C:\Python27\lib\socket.pyc in meth(name, self, *args)
222 proto = property(lambda self: self._sock.proto, doc="the socket protocol")
223
--> 224 def meth(name,self,*args):
225 return getattr(self._sock,name)(*args)
226
as the culprit, but this then seems to get into messing with things at a lower level than I am comfortable with.
I had searched and found this Python socket doesn't close connection properly which recommended doing:
setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
so, I rebuilt the pykafka lib using that option in the io.py file:
def connect(self):
""" Connect to the Kafka server. """
global socket
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect((self.host, self.port))
and I still get the same error.
Am I not putting the setsockopt line in the right spot? Is there anything else I could be trying?
Calling conn. close() is indeed the correct way to close the connection.
Python takes the automatic shutdown a step further, and says that when a socket is garbage collected, it will automatically do a close if it's needed.
close() call shuts down the socket associated with the socket descriptor socket, and frees resources allocated to the socket. If socket refers to an open TCP connection, the connection is closed. If a stream socket is closed when there is input data queued, the TCP connection is reset rather than being cleanly closed.
What you are describing is normal TCP behavior at the socket level. When a user level program closes a socket the kernel does not free the socket right away. It enters TIME_WAIT state:
TIME-WAIT (either server or client) represents waiting for enough time to pass to be sure the remote TCP received the acknowledgment of its connection termination request. [According to RFC 793 a connection can stay in TIME-WAIT for a maximum of four minutes known as a MSL (maximum segment lifetime).
So the socket is closed. The socket.SO_REUSEADDR is for listeners (servers), doesn't effect client connections. Well, really used when binding the socket.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With