Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Separate computation from socket work in Python

I'm serializing column data and then sending it over a socket connection. Something like:

import array, struct, socket

## Socket setup
s = socket.create_connection((ip, addr))

## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)

for i in range(num_of_chunks):
    ## Binarize data
    columns['col1'] = array.array('i', range(10000))
    columns['col2'] = array.array('f', [float(num) for num in range(10000)])
    .
    .
    .

    ## Send away
    chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
    s.sendall(chunk)
    s.recv(1000)      #get confirmation

I wish to separate the computation from the sending, put them on separate threads or processes, so I can keep doing computations while data is sent away.

I've put the binarizing part as a generator function, then sent the generator to a separate thread, which then yielded binary chunks via a queue.

I collected the data from the main thread and sent it away. Something like:

import array, struct, socket
from time import sleep
try:
    import  thread
    from Queue import Queue
except:
    import _thread as thread
    from queue import Queue


## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()


def binarize(num_of_chunks):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)

    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        .
        .

        yield b''.join((columns[col_name] for col_name in ordered_col_list))


def chunk_yielder(queue):
    ''' Generate binary chunks and put them on a queue. To be used from a thread '''

    while True:   
        try:
            data_gen = queue.get_nowait()
        except:
            sleep(0.1)
            continue
        else:    
            for chunk in data_gen:
                queue.put(chunk)


## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)


## Get data back and send away
while True:
   try:
        binary_chunk = queue.get_nowait()
    except:
        sleep(0.1)
        continue
    else:    
        socket.sendall(binary_chunk)
        socket.recv(1000) #Get confirmation

However, I did not see and performance imporovement - it did not work faster.

I don't understand threads/processes too well, and my question is whether it is possible (at all and in Python) to gain from this type of separation, and what would be a good way to go about it, either with threads or processess (or any other way - async etc).

EDIT:

As far as I've come to understand -

  1. Multirpocessing requires serializing any sent data, so I'm double-sending every computed data.
  2. Sending via socket.send() should release the GIL

Therefore I think (please correct me if I am mistaken) that a threading solution is the right way. However I'm not sure how to do it correctly.

I know cython can release the GIL off of threads, but since one of them is just socket.send/recv, my understanding is that it shouldn't be necessary.

like image 940
Jay Avatar asked Jan 08 '18 02:01

Jay


People also ask

How do you establish a connection between client and server in python?

To use python socket connection, we need to import socket module. Then, sequentially we need to perform some task to establish connection between server and client. We can obtain host address by using socket. gethostname() function.

Can a socket send and receive at the same time python?

You can send and receive on the same socket at the same time (via multiple threads). But the send and receive may not actually occur simultaneously, since one operation may block the other from starting until it's done.


2 Answers

You have two options for running things in parallel in Python, either use the multiprocessing (docs) library , or write the parallel code in cython and release the GIL. The latter is significantly more work and less applicable generally speaking.

Python threads are limited by the Global Interpreter Lock (GIL), I won't go into detail here as you will find more than enough information online on it. In short, the GIL, as the name suggests, is a global lock within the CPython interpreter that ensures multiple threads do not modify objects, that are within the confines of said interpreter, simultaneously. This is why, for instance, cython programs can run code in parallel because they can exist outside the GIL.


As to your code, one problem is that you're running both the number crunching (binarize) and the socket.send inside the GIL, this will run them strictly serially. The queue is also connected very strangely, and there is a NameError but let's leave those aside.

With the caveats already pointed out by Jeremy Friesner in mind, I suggest you re-structure the code in the following manner: you have two processes (not threads) one for binarising the data and the other for sending data. In addition to those, there is also the parent process that started both children, and a queue connecting child 1 to child 2.

  • Subprocess-1 does number crunching and produces crunched data into a queue
  • Subprocess-2 consumes data from a queue and does socket.send

in code the setup would look something like

from multiprocessing import Process, Queue

work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()

binarize can remain as it is in your code, with the exception that instead of a yield at the end, you add elements into the queue

def binarize(num_of_chunks, q):
    ''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''

    ordered_col_list = ('col1', 'col2')
    columns = dict.fromkeys(ordered_col_list)
    for i in range(num_of_chunks):
        columns['col1'] = array.array('i', range(10000)).tostring()
        columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
        data = b''.join((columns[col_name] for col_name in ordered_col_list))
        q.put(data)

send_data should just be the while loop from the bottom of your code, with the connection open/close functionality

def send_data(ip, addr, q):
     s = socket.create_connection((ip, addr))
     while True:
         try:
             binary_chunk = q.get(False)
         except:
             sleep(0.1)
             continue
         else:    
             socket.sendall(binary_chunk)
             socket.recv(1000) # Get confirmation
    # maybe remember to close the socket before killing the process

Now you have two (three actually if you count the parent) processes that are processing data independently. You can force the two processes to synchronise their operations by setting the max_size of the queue to a single element. The operation of these two separate processes is also easy to monitor from the process manager on your computer top (Linux), Activity Monitor (OsX), don't remember what it's called under Windows.


Finally, Python 3 comes with the option of using co-routines which are neither processes nor threads, but something else entirely. Co-routines are pretty cool from a CS point of view, but a bit of a head scratcher at first. There is plenty of resources to learn from though, like this post on Medium and this talk by David Beazley.


Even more generally, you might want to look into the producer/consumer pattern, if you are not already familiar with it.

like image 169
Matti Lyra Avatar answered Oct 27 '22 18:10

Matti Lyra


If you are trying to use concurrency to improve performance in CPython I would strongly recommend using multiprocessing library instead of multithreading. It is because of GIL (Global Interpreter Lock), which can have a huge impact on execution speed (in some cases, it may cause your code to run slower than single threaded version). Also, if you would like to learn more about this topic, I recommend reading this presentation by David Beazley. Multiprocessing bypasses this problem by spawning a new Python interpreter instance for each process, thus allowing you to take full advantage of multi core architecture.

like image 32
Paweł P. Avatar answered Oct 27 '22 19:10

Paweł P.