Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python performance - best parallelism approach

Tags:

I am implementing a Python script that needs to keep sending 1500+ packets in parallel in less than 5 seconds each.

In a nutshell what I need is:

def send_pkts(ip):     #craft packet     while True:         #send packet         time.sleep(randint(0,3))  for x in list[:1500]:     send_pkts(x)     time.sleep(randint(1,5)) 

I have tried the simple single-threaded, multithreading, multiprocessing and multiprocessing+multithreading forms and had the following issues:

  1. Simple single-threaded: The "for delay" seems to compromise the "5 seconds" dependency.
  2. Multithreading: I think I could not accomplish what I desire due to Python GIL limitations.
  3. Multiprocessing: That was the best approach that seemed to work. However, due to excessive quantity of process the VM where I am running the script freezes (of course, 1500 process running). Thus becoming impractical.
  4. Multiprocessing+Multithreading: In this approach I created less process with each of them calling some threads (lets suppose: 10 process calling 150 threads each). It was clear that the VM is not freezing as fast as approach number 3, however the most "concurrent packet sending" I could reach was ~800. GIL limitations? VM limitations? In this attempt I also tried using Process Pool but the results where similar.

Is there a better approach I could use to accomplish this task?

[1] EDIT 1:

 def send_pkt(x):      #craft pkt      while True:          #send pkt          gevent.sleep(0)   gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]]) 

[2] EDIT 2 (gevent monkey-patching):

from gevent import monkey; monkey.patch_all()  jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]] gevent.wait(jobs) #for send_pkt(x) check [1] 

However I got the following error: "ValueError: filedescriptor out of range in select()". So I checked my system ulimit (Soft and Hard both are maximum: 65536). After, I checked it has something to do with select() limitations over Linux (1024 fds maximum). Please check: http://man7.org/linux/man-pages/man2/select.2.html (BUGS section) - In orderto overcome that I should use poll() (http://man7.org/linux/man-pages/man2/poll.2.html) instead. But with poll() I return to same limitations: as polling is a "blocking approach".

Regards,

like image 420
pascoal Avatar asked Oct 24 '16 15:10

pascoal


People also ask

How do you achieve parallelism in Python?

For parallelism, Python offers multiprocessing, which launches multiple instances of the Python interpreter, each one running independently on its own hardware thread. All three of these mechanisms — threading, coroutines, and multiprocessing — have distinctly different use cases.

Is Python good at concurrency?

Running on a different core means that they actually can run at the same time, which is fabulous. There are some complications that arise from doing this, but Python does a pretty good job of smoothing them over most of the time. The operating system decides when to switch tasks external to Python.

Which is better multiprocessing or multithreading in Python?

Multiprocessing is a easier to just drop in than threading but has a higher memory overhead. If your code is CPU bound, multiprocessing is most likely going to be the better choice—especially if the target machine has multiple cores or CPUs.

Is it a good idea to use multi thread to speed your Python code?

No, it is not a good idea. Multithreading is not possible in Python due to something called the Global Interpreter Lock.


1 Answers

When using parallelism in Python a good approach is to use either ThreadPoolExecutor or ProcessPoolExecutor from https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures these work well in my experience.

an example of threadedPoolExecutor that can be adapted for your use.

import concurrent.futures import urllib.request import time  IPs= ['168.212. 226.204',         '168.212. 226.204',         '168.212. 226.204',         '168.212. 226.204',         '168.212. 226.204']  def send_pkt(x):   status = 'Failed'   while True:     #send pkt     time.sleep(10)     status = 'Successful'     break   return status  with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:     future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}     for future in concurrent.futures.as_completed(future_to_ip):         ip = future_to_ip[future]         try:             data = future.result()         except Exception as exc:             print('%r generated an exception: %s' % (ip, exc))         else:             print('%r send %s' % (url, data)) 
like image 120
Carl Kristensen Avatar answered Nov 09 '22 04:11

Carl Kristensen