I'm serializing column data and then sending it over a socket connection. Something like:
import array, struct, socket
## Socket setup
s = socket.create_connection((ip, addr))
## Data container setup
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
## Binarize data
columns['col1'] = array.array('i', range(10000))
columns['col2'] = array.array('f', [float(num) for num in range(10000)])
.
.
.
## Send away
chunk = b''.join(columns[col_name] for col_name in ordered_col_list]
s.sendall(chunk)
s.recv(1000) #get confirmation
I wish to separate the computation from the sending, put them on separate threads or processes, so I can keep doing computations while data is sent away.
I've put the binarizing part as a generator function, then sent the generator to a separate thread, which then yielded binary chunks via a queue.
I collected the data from the main thread and sent it away. Something like:
import array, struct, socket
from time import sleep
try:
import thread
from Queue import Queue
except:
import _thread as thread
from queue import Queue
## Socket and queue setup
s = socket.create_connection((ip, addr))
chunk_queue = Queue()
def binarize(num_of_chunks):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
.
.
yield b''.join((columns[col_name] for col_name in ordered_col_list))
def chunk_yielder(queue):
''' Generate binary chunks and put them on a queue. To be used from a thread '''
while True:
try:
data_gen = queue.get_nowait()
except:
sleep(0.1)
continue
else:
for chunk in data_gen:
queue.put(chunk)
## Setup thread and data generator
thread.start_new_thread(chunk_yielder, (chunk_queue,))
num_of_chunks = 100
data_gen = binarize(num_of_chunks)
queue.put(data_gen)
## Get data back and send away
while True:
try:
binary_chunk = queue.get_nowait()
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) #Get confirmation
However, I did not see and performance imporovement - it did not work faster.
I don't understand threads/processes too well, and my question is whether it is possible (at all and in Python) to gain from this type of separation, and what would be a good way to go about it, either with threads or processess (or any other way - async etc).
EDIT:
As far as I've come to understand -
socket.send()
should release the GILTherefore I think (please correct me if I am mistaken) that a threading solution is the right way. However I'm not sure how to do it correctly.
I know cython can release the GIL off of threads, but since one of them is just socket.send/recv, my understanding is that it shouldn't be necessary.
To use python socket connection, we need to import socket module. Then, sequentially we need to perform some task to establish connection between server and client. We can obtain host address by using socket. gethostname() function.
You can send and receive on the same socket at the same time (via multiple threads). But the send and receive may not actually occur simultaneously, since one operation may block the other from starting until it's done.
You have two options for running things in parallel in Python, either use the multiprocessing
(docs) library , or write the parallel code in cython
and release the GIL. The latter is significantly more work and less applicable generally speaking.
Python threads are limited by the Global Interpreter Lock (GIL), I won't go into detail here as you will find more than enough information online on it. In short, the GIL, as the name suggests, is a global lock within the CPython interpreter that ensures multiple threads do not modify objects, that are within the confines of said interpreter, simultaneously. This is why, for instance, cython
programs can run code in parallel because they can exist outside the GIL.
As to your code, one problem is that you're running both the number crunching (binarize
) and the socket.send
inside the GIL, this will run them strictly serially. The queue
is also connected very strangely, and there is a NameError
but let's leave those aside.
With the caveats already pointed out by Jeremy Friesner in mind, I suggest you re-structure the code in the following manner: you have two processes (not threads) one for binarising the data and the other for sending data. In addition to those, there is also the parent process that started both children, and a queue connecting child 1 to child 2.
socket.send
in code the setup would look something like
from multiprocessing import Process, Queue
work_queue = Queue()
p1 = Process(target=binarize, args=(100, work_queue))
p2 = Process(target=send_data, args=(ip, port, work_queue))
p1.start()
p2.start()
p1.join()
p2.join()
binarize
can remain as it is in your code, with the exception that instead of a yield
at the end, you add elements into the queue
def binarize(num_of_chunks, q):
''' Generator function that yields chunks of binary data. In reality it wouldn't be the same data'''
ordered_col_list = ('col1', 'col2')
columns = dict.fromkeys(ordered_col_list)
for i in range(num_of_chunks):
columns['col1'] = array.array('i', range(10000)).tostring()
columns['col2'] = array.array('f', [float(num) for num in range(10000)]).tostring()
data = b''.join((columns[col_name] for col_name in ordered_col_list))
q.put(data)
send_data
should just be the while
loop from the bottom of your code, with the connection open/close functionality
def send_data(ip, addr, q):
s = socket.create_connection((ip, addr))
while True:
try:
binary_chunk = q.get(False)
except:
sleep(0.1)
continue
else:
socket.sendall(binary_chunk)
socket.recv(1000) # Get confirmation
# maybe remember to close the socket before killing the process
Now you have two (three actually if you count the parent) processes that are processing data independently. You can force the two processes to synchronise their operations by setting the max_size
of the queue to a single element. The operation of these two separate processes is also easy to monitor from the process manager on your computer top
(Linux), Activity Monitor
(OsX), don't remember what it's called under Windows.
Finally, Python 3 comes with the option of using co-routines which are neither processes nor threads, but something else entirely. Co-routines are pretty cool from a CS point of view, but a bit of a head scratcher at first. There is plenty of resources to learn from though, like this post on Medium and this talk by David Beazley.
Even more generally, you might want to look into the producer/consumer pattern, if you are not already familiar with it.
If you are trying to use concurrency to improve performance in CPython I would strongly recommend using multiprocessing library instead of multithreading. It is because of GIL (Global Interpreter Lock), which can have a huge impact on execution speed (in some cases, it may cause your code to run slower than single threaded version). Also, if you would like to learn more about this topic, I recommend reading this presentation by David Beazley. Multiprocessing bypasses this problem by spawning a new Python interpreter instance for each process, thus allowing you to take full advantage of multi core architecture.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With