Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple threads writing to the same CSV in Python

I'm new to multi-threading in Python and am currently writing a script that appends to a csv file. If I was to have multiple threads submitted to an concurrent.futures.ThreadPoolExecutor that appends lines to a csv file. What could I do to guarantee thread safety if appending was the only file-related operation being done by these threads?

Simplified version of my code:

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    for count,ad_id in enumerate(advertisers):

        downloadFutures.append(executor.submit(downloadThread, arguments.....))
        time.sleep(random.randint(1,3)) 

And my thread class being:

def downloadThread(arguments......):

                #Some code.....

                writer.writerow(re.split(',', line.decode()))

Should I set up a seperate single-threaded executor to handle writing or is it woth worrying about if I am just appending?

EDIT: I should elaborate that when the write operations occur can vary greatly with minutes between when the file is next appended to, I am just concerned that this scenario has not occurred when testing my script and I would prefer to be covered for that.

like image 403
GreenGodot Avatar asked Oct 13 '15 15:10

GreenGodot


People also ask

Can Python run multiple threads?

To recap, threading in Python allows multiple threads to be created within a single process, but due to GIL, none of them will ever run at the exact same time. Threading is still a very good option when it comes to running multiple I/O bound tasks concurrently.

Can multiple threads write to the same file?

Multiple threads can read and write the same file in several situations: Multiple threads read the same file at the same time. In this case, there is no conflict. If multiple threads write the same file at the same time, write data will be lost.

Is CSV writer thread safe?

CSVWriter is not thread-safe #78.


2 Answers

I am not sure if csvwriter is thread-safe. The documentation doesn't specify, so to be safe, if multiple threads use the same object, you should protect the usage with a threading.Lock:

# create the lock
import threading
csv_writer_lock = threading.Lock()

def downloadThread(arguments......):
    # pass csv_writer_lock somehow
    # Note: use csv_writer_lock on *any* access
    # Some code.....
    with csv_writer_lock:
        writer.writerow(re.split(',', line.decode()))

That being said, it may indeed be more elegant for the downloadThread to submit write tasks to an executor, instead of explicitly using locks like this.

like image 191
Claudiu Avatar answered Oct 09 '22 19:10

Claudiu


Way-late-to-the-party note: You could handle this a different way with no locking by having a single writer consuming from a shared Queue, with rows being pushed to the Queue by the threads doing the processing.

from threading import Thread
from queue import Queue
from concurrent.futures import ThreadPoolExecutor


# CSV writer setup goes here

queue = Queue()


def consume():
    while True:
        if not queue.empty():
            i = queue.get()
            
            # Row comes out of queue; CSV writing goes here
            
            print(i)
            if i == 4999:
                return


consumer = Thread(target=consume)
consumer.setDaemon(True)
consumer.start()


def produce(i):
    # Data processing goes here; row goes into queue
    queue.put(i)


with ThreadPoolExecutor(max_workers=10) as executor:
    for i in range(5000):
        executor.submit(produce, i)

consumer.join()
like image 16
kungphu Avatar answered Oct 09 '22 20:10

kungphu