Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python UDP socket send bottleneck (slow/delays randomly)

Python UDP Streamer with hickup in sending

I'm currently developing on a python 3.4 network streaming app. And i have some crazy behavior with my socket. (Target 3.3 compatible if possible)
Definition: When i talk of Stream an UDP-Stream is meant.

The problem

While sending the socket.send operation sometimes start take 1-3ms, as i will describe more below the transfer target is much higher. I found other threads here telling about problems with speed, but they handled to send 200k packages a second, but they only send "A". In my case each packet is 1500 Bytes inc. UDP and IP header added by socket. Please see my explains below if the problem not is clear at this point.

Question

Does anyone have an idea why this delays? Or how to speed up sending to reach real time?

My test code looks like this:

def _transfer(self):
    self.total_num_samps_sent = 0
    self.sequence_out = 0
    self.send_in_progress = True
    send = self.udp_socket.send
    for i in range(0, len(streams), 1):
        stream_data, stream_samps, stream_seq = self.packed_streams[i]
        # commit the samples
        start_try_send_time = monotonic()
        while not self.ready():
            if monotonic() - start_try_send_time > self.timeout > 0:
                # timeout; if timeout == 0 wait endless
                return 0
        self.sequence_out = stream_seq
        # ######################
        # Here is the bottleneck
        # ######################
        s = monotonic()
        send(stream_data)
        e = monotonic()
        if e-s > 0:
            print(str(i) + ': ' + str(e-s))
        # #####################
        # end measure monotonic
        # #####################
    self.total_num_samps_sent += stream_samps
    self.send_in_progress = False

self.packed_streams contains a list of tuples (data_in_bytes(), number_samples_in_this_stream, sequence_out) the function self.ready() returns True when the targed ACK'ed enough packets send (has free RAM).

The special marked bottleneck is more detailed profiled: see it a little more down

The socket creation looks like:

self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
    self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
    error = ('OS Error: {0}'.format(os_error)
             + linesep + 'IP src: ' + str(self.ip_own)
             + linesep + 'IP dst: ' + str(self.ip_usrp)
             + linesep + 'Port: {0}'.format(port))
    exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False) 

The sendfunction (1st code block) runs as seperate thread. And the UDPFlowControl spawns another thread too. Running on same socket as the send streamer (the Streamer inherits the FlowControl and uses its ready state)

UDPFlowControl

def _worker(self):
    """
    * Receive Loop
    * - update flow control condition count
    * - put async message packets into queue
    """
    self.send_here_am_i()
    while 1:
        ready = select([self.udp_socket], [], [], self.socket_timeout)
        if ready[0]:
            try:
                data_in = self.udp_socket.recv(2048)
            except:
                # ignore timeout/error buffers
                continue
            # with suppress(Exception):  #ToDo Reenable after test is done
            bytes_in = len(data_in)
            self.data_received += bytes_in
            # extract the vrt header packet info
            vrt = VRTImplementation()
            vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
            if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
                continue
            # handle a tx async report message
            if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
                # fill in the async metadata
                metadata = MetadataAsync()
                metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
                                       self.tick_rate)
                # catch the flow control packets and react
                if metadata.event_code == EventCode.FLOW_CONTROL:
                    self.sequence_in = \
                        unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
                    continue
                self.async_msg_fifo.append(metadata)
            else:
                # TODO: unknown packet
                pass

def ready(self):
    """
    Check if less ack are outstanding than max allowed
    :returns bool: if device can get more data
    """
    return self.sequence_out - self.sequence_in < self.max_sequence_out

cProfile

<< Removed old benchmark >> see history if need this information again!

As mentioned above the monotonic profiling is the reason of my question. As you see times of 0 are ignored. The output looks like this: (The stream contains data of 5 seconds (2754,8 bytestreams to send) with resulting size (wireshark) of 1500 Bytes each

Send:  445.40K of    5.00M, Sending:  True @ monotonic time:   44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send:  724.18K of    5.00M, Sending:  True @ monotonic time:   44927.3200
....

First number is the index of delayed packed. The 2nd number is the diff time monotonic of this delay. Not shown here but in my log i found timings like 7582: 0.030999999995401595 and sometimes much heigher at 0.06...

The lines starting with Send are the Main Thread writing the current state to console. After writing it goes sleep 250ms.

My problem is currently the system only runs at 1/25 of target speed and already started this hickups as you see in cProfile this takes nearly 30 seconds to send a 5 second stream. Target speed would be 68870P/s @ each 1500Bytes which is ~98,5MByte containing overhead @ GbE => 125MByte/s limit.

This is single target application. And normally attached directly to device over network-wire without any router,switch, whatever. So the network belongs to only this app and device.

What i have done so far:

  • As you see in code i minimized the test to a minimum, the streams are already in memory ready to transfer out to device no more conversion required, only put inside socket.
  • Tested wiht select if sending socket is ready, started monotonic, throw data inside socket, stop monotonic and see results.
  • Check network with wireshark ( of 13774 send calls 13774 appear in wireshark, i count ~1310 hickups)
  • Think about GIL as reason but hard to figure out.
  • Turn of Firewall while testing - no change
  • [Edit 1] Testet in C++ with Boost if socket can perform in target speed, here it has hickups too but they are much shorter 100-1000µs (this the 1MB buffer in device can handle)

In all tests keep in mind, the print command is only there to debug. Half of monotonic calls go to debug purpose too.

<< Removed old benchmark >> see history if need this information again!

Running on Windows 7 x64 with Python 3.4.2. @ Corei7 2630QM and 8GB RAM

<< Removed old benchmark >> see history if need this information again!

Edit 3

First, because I can answer it fast cProfile runs inside Thread, the _worker still is an unprofiled 2nd Thread because of low time used in waiting to be ready (~0.05 in sum) i guessed it runs fast enough. The _send function is thread entrance, and more a wrapper to be able to cProfile this Thread.

def _send(self):
    profile = cProfile.Profile()
    profile.enable()
    self._transfer()
    profile.disable()
    profile.print_stats()

Disable the Timeouts and rerun the profiling need wait 1 or 2 days i am currently cleaning up code because there still left threads in background stay in suspended state with (250ms sleeps) i think it's not a problem to let them die and respawn on usage. When this is done i will retry tests. More I think about GIL is the evil here. Possible it's the process of unpacking incomming packages within flow control and the switching between threads what can take some time and cause this hickups. (If i understand GIL correct - only one thread can execute python code at once, but i wonder why this always hits the socket action, and not split up the ready and send call in a more equal way like 40/60-50/50) So there is the futures pack on my todo list to get real multi core use with Processes. To test this out I will set the return of ready to permanent be True and the FlowControl Thread to not start or return in 1st command.

And target of this Programm is to run on Linux, Windows, Mac and Unix.

Edit 4

First about Threads - they have no priority as mentioned here: Controlling scheduling priority of python threads? I believe there is no way to change it. The core Python runs on is at 25% max. The overall system load is around 10% when debugger runs.

The run with select was only a test. I removed the select code in send routine and tested with and without timeouts:

<< Removed old benchmark >> see history if need this information again!

Thread cleaned example of old code

In this example i killed all threads instead of send them sleep. And the Main thread sleeps more time. Without FlowControl @ 5M

         41331 function calls in 2.935 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    2.007    2.007    2.935    2.935 SendStreamer.py:297(_transfer)
 13776    0.005    0.000    0.005    0.000 UDPFlowControl.py:52(ready)
     1    0.000    0.000    0.000    0.000 {built-in method len}
 13776    0.007    0.000    0.007    0.000 {built-in method monotonic}
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 13776    0.915    0.000    0.915    0.000 {method 'send' of '_socket.socket' objects}

With FlowControl @ 5M

Here it stays more time in waiting the device than in send.

            68873 function calls in 5.245 seconds

Ordered by: standard name

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1    4.210    4.210    5.245    5.245 SendStreamer.py:297(_transfer)
 27547    0.030    0.000    0.030    0.000 UDPFlowControl.py:52(ready)
     1    0.000    0.000    0.000    0.000 {built-in method len}
 27547    0.011    0.000    0.011    0.000 {built-in method monotonic}
     1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 13776    0.993    0.000    0.993    0.000 {method 'send' of '_socket.socket' objects}

Still open: split up into processes. - Still refactoring the class structures towards process usage (I think latest end of may I have some new results to add). During some more detailed benchmark I found out that the 2nd thread (unpack of VRT) takes nearly the time of each hickups duration. With processes this should no more be a possible reason to the slowdowns.

I hope there is all information required, if i forgot some please ask!

[Edit1] Added Informations in what i have done list

[Edit2] Added cProfiles of 2nd test system (Manjaro)

[Edit3] Added Informations about how cProfile runs.

[Edit4] More cProfiles + Answer about threads

[Edit5] Removed old benchmarks

like image 524
Jan Avatar asked Mar 04 '15 04:03

Jan


1 Answers

I can confirm this on Linux ran as unprivileged user, python2.

I don't think there's much you can do:

# timing code:
In [16]: @contextlib.contextmanager
   ....: def timeit():
   ....:     st = time.time()
   ....:     yield
   ....:     en = time.time()
   ....:     b = int(math.log10(en - st))
   ....:     data.setdefault(b, 0)
   ....:     data[b] += 1

# Thus, timing data means:
-6: number of times send took between 0.00000011 and 0.000001s
-4: 0.0000011 ~ 0.00001
-4: 0.000011 ~ 0.0001
-3: 0.00011 ~ 0.001 (up to millisecond)
-2: 0.0011 ~ 0.01 (1..10ms)

# Regular blocking socket
{-6: 2807, -5: 992126, -4: 5049, -3: 18}
# Non-blocking socket
{-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
# socket with timeout=0
{-6: 2249, -5: 992994, -4: 4749, -3: 8}
# socket with timeout=1
{-5: 994259, -4: 5727, -3: 8, -2: 6}

It looks like tail of this distribution is exponential.

I also larger send buffer and adding occasional time.sleep to give kernel time to send our queued packet, and that didn't help. Makes sense since non-blocking also gets occasional slow send.

I also tried waiting explicitly for send queue to be empty, per http://www.pycopia.net/_modules/pycopia/socket.html outq function, and that didn't change the distribution either.

like image 162
Dima Tisnek Avatar answered Nov 15 '22 19:11

Dima Tisnek