Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to apply parallel or asynchronous I/O file writing on a python piece of code

To begin with, we're given the following piece of code:

from validate_email import validate_email
import time
import os

def verify_emails(email_path, good_filepath, bad_filepath):
    good_emails = open(good_filepath, 'w+')
    bad_emails = open(bad_filepath, 'w+')

    emails = set()

    with open(email_path) as f:
        for email in f:
            email = email.strip()

            if email in emails:
                continue
            emails.add(email)

            if validate_email(email, verify=True):
                good_emails.write(email + '\n')
            else:
                bad_emails.write(email + '\n')

if __name__ == "__main__":
    os.system('cls')
    verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")

I expect contacting SMTP servers to be the most expensive part by far from my program when emails.txt contains large amount of lines (>1k). Using some form of parallel or asynchronous I/O should speed this up a lot, since I can wait for multiple servers to respond instead of waiting sequentially.

As far as I have read:

Asynchronous I/O operates by queuing a request for I/O to the file descriptor, tracked independently of the calling process. For a file descriptor that supports asynchronous I/O (raw disk devcies typically), a process can call aio_read() (for instance) to request a number of bytes be read from the file descriptor. The system call returns immediately, whether or not the I/O has completed. Some time later, the process then polls the operating system for the completion of the I/O (that is, buffer is filled with data).

To be sincere, I didn't quite understand how to implement async I/O on my program. Can anybody take a little time and explain me the whole process ?


EDIT as per PArakleta suggested:

from validate_email import validate_email
import time
import os
from multiprocessing import Pool
import itertools

def validate_map(e):
    return (validate_email(e.strip(), verify=True), e)

seen_emails = set()
def unique(e):
    if e in seen_emails:
        return False
    seen_emails.add(e)
    return True

def verify_emails(email_path, good_filepath, bad_filepath):
    good_emails = open(good_filepath, 'w+')
    bad_emails = open(bad_filepath, 'w+')

    with open(email_path, "r") as f:
        for result in Pool().imap_unordered(validate_map,
                                    itertools.ifilter(unique, f):
        (good, email) = result
        if good:
            good_emails.write(email)
        else:
            bad_emails.write(email)
        good_emails.close()
        bad_emails.close()

if __name__ == "__main__":
    os.system('cls')
    verify_emails("emails.txt", "good_emails.txt", "bad_emails.txt")
like image 763
Cajuu' Avatar asked Oct 08 '15 13:10

Cajuu'


1 Answers

You're asking the wrong question

Having looked at the validate_email package your real problem is that you're not efficiently batching your results. You should be only doing the MX lookup once per domain and then only connect to each MX server once, go through the handshake, and then check all of the addresses for that server in a single batch. Thankfully the validate_email package does the MX result caching for you, but you still need to be group the email addresses by server to batch the query to the server itself.

You need to edit the validate_email package to implement batching, and then probably give a thread to each domain using the actual threading library rather than multiprocessing.

It's always important to profile your program if it's slow and figure out where it is actually spending the time rather than trying to apply optimisation tricks blindly.

The requested solution

IO is already asynchronous if you are using buffered IO and your use case fits with the OS buffering. The only place you could potentially get some advantage is in read-ahead but Python already does this if you use the iterator access to a file (which you are doing). AsyncIO is an advantage to programs that are moving large amounts of data and have disabled the OS buffers to prevent copying the data twice.

You need to actually profile/benchmark your program to see if it has any room for improvement. If your disks aren't already throughput bound then there is a chance to improve the performance by parallel execution of the processing of each email (address?). The easiest way to check this is probably to check to see if the core running your program is maxed out (i.e. you are CPU bound and not IO bound).

If you are CPU bound then you need to look at threading. Unfortunately Python threading doesn't work in parallel unless you have non-Python work to be done so instead you'll have to use multiprocessing (I'm assuming validate_email is a Python function).

How exactly you proceed depends on where the bottleneck's in your program are and how much of a speed up you need to get to the point where you are IO bound (since you cannot actually go any faster than that you can stop optimising when you hit that point).

The emails set object is hard to share because you'll need to lock around it so it's probably best that you keep that in one thread. Looking at the multiprocessing library the easiest mechanism to use is probably Process Pools.

Using this you would need to wrap your file iterable in an itertools.ifilter which discards duplicates, and then feed this into a Pool.imap_unordered and then iterate that result and write into your two output files.

Something like:

with open(email_path) as f:
    for result in Pool().imap_unordered(validate_map,
                                        itertools.ifilter(unique, f):
        (good, email) = result
        if good:
            good_emails.write(email)
        else:
            bad_emails.write(email)

The validate_map function should be something simple like:

def validate_map(e):
    return (validate_email(e.strip(), verify=True), e)

The unique function should be something like:

seen_emails = set()
def unique(e):
    if e in seen_emails:
        return False
    seen_emails.add(e)
    return True

ETA: I just realised that validate_email is a library which actually contacts SMTP servers. Given that it's not busy in Python code you can use threading. The threading API though is not as convenient as the multiprocessing library but you can use multiprocessing.dummy to have a thread based Pool.

If you are CPU bound then it's not really worth having more threads/processes than cores but since your bottleneck is network IO you can benefit from many more threads/processes. Since processes are expensive you want to swap to threads and then crank up the number running in parallel (although you should be polite not to DOS-attack the servers you are connecting to).

Consider from multiprocessing.dummy import Pool as ThreadPool and then call ThreadPool(processes=32).imap_unordered().

like image 95
Parakleta Avatar answered Nov 14 '22 21:11

Parakleta