I'm writing an app that appends lines to the same file from multiple threads.
I have a problem in which some lines are appended without a new line.
Any solution for this?
class PathThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def printfiles(self, p): for path, dirs, files in os.walk(p): for f in files: print(f, file=output) def run(self): while True: path = self.queue.get() self.printfiles(path) self.queue.task_done() pathqueue = Queue.Queue() paths = getThisFromSomeWhere() output = codecs.open('file', 'a') # spawn threads for i in range(0, 5): t = PathThread(pathqueue) t.setDaemon(True) t.start() # add paths to queue for path in paths: pathqueue.put(path) # wait for queue to get empty pathqueue.join()
Writing to the same file from multiple threads concurrently is not thread safe and may result in a race condition. Thread-safe means that writing or appending to the same file from more than one thread may result in a race condition.
Multiple threads can read and write the same file in several situations: Multiple threads read the same file at the same time. In this case, there is no conflict. If multiple threads write the same file at the same time, write data will be lost.
Appending a file from multiple threads is not thread-safe and will result in overwritten data and file corruption.
Writing to a file can be made thread-safe by using a mutual exclusion (mutex) lock.
The solution is to write to the file in one thread only.
import Queue # or queue in Python 3 import threading class PrintThread(threading.Thread): def __init__(self, queue): threading.Thread.__init__(self) self.queue = queue def printfiles(self, p): for path, dirs, files in os.walk(p): for f in files: print(f, file=output) def run(self): while True: result = self.queue.get() self.printfiles(result) self.queue.task_done() class ProcessThread(threading.Thread): def __init__(self, in_queue, out_queue): threading.Thread.__init__(self) self.in_queue = in_queue self.out_queue = out_queue def run(self): while True: path = self.in_queue.get() result = self.process(path) self.out_queue.put(result) self.in_queue.task_done() def process(self, path): # Do the processing job here pathqueue = Queue.Queue() resultqueue = Queue.Queue() paths = getThisFromSomeWhere() output = codecs.open('file', 'a') # spawn threads to process for i in range(0, 5): t = ProcessThread(pathqueue, resultqueue) t.setDaemon(True) t.start() # spawn threads to print t = PrintThread(resultqueue) t.setDaemon(True) t.start() # add paths to queue for path in paths: pathqueue.put(path) # wait for queue to get empty pathqueue.join() resultqueue.join()
the fact that you never see jumbled text on the same line or new lines in the middle of a line is a clue that you actually dont need to syncronize appending to the file. the problem is that you use print to write to a single file handle. i suspect print
is actually doing 2 operations to the file handle in one call and those operations are racing between the threads. basically print
is doing something like:
file_handle.write('whatever_text_you_pass_it') file_handle.write(os.linesep)
and because different threads are doing this simultaneously on the same file handle sometimes one thread will get in the first write and the other thread will then get in its first write and then you'll get two carriage returns in a row. or really any permutation of these.
the simplest way to get around this is to stop using print
and just use write
directly. try something like this:
output.write(f + os.linesep)
this still seems dangerous to me. im not sure what gaurantees you can expect with all the threads using the same file handle object and contending for its internal buffer. personally id side step the whole issue and just have every thread get its own file handle. also note that this works because the default for write buffer flushes is line-buffered, so when it does a flush to the file it ends on an os.linesep
. to force it to use line-buffered send a 1
as the third argument of open
. you can test it out like this:
#!/usr/bin/env python import os import sys import threading def hello(file_name, message, count): with open(file_name, 'a', 1) as f: for i in range(0, count): f.write(message + os.linesep) if __name__ == '__main__': #start a file with open('some.txt', 'w') as f: f.write('this is the beginning' + os.linesep) #make 10 threads write a million lines to the same file at the same time threads = [] for i in range(0, 10): threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000))) threads[-1].start() for t in threads: t.join() #check what the heck the file had uniq_lines = set() with open('some.txt', 'r') as f: for l in f: uniq_lines.add(l) for u in uniq_lines: sys.stdout.write(u)
The output looks like this:
hey im thread 6 hey im thread 7 hey im thread 9 hey im thread 8 hey im thread 3 this is the beginning hey im thread 5 hey im thread 4 hey im thread 1 hey im thread 0 hey im thread 2
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With