I have the following python program, which starts three processes that each write 10000 random rows to the same file using an inherited file handle:
import multiprocessing
import random
import string
import traceback
if __name__ == '__main__':
# clear out the file first
open('out.txt', 'w')
# initialise file handle to be inherited by sub-processes
file_handle = open('out.txt', 'a', newline='', encoding='utf-8')
process_count = 3
# routine to be run by sub-processes
# adds n lines to the file
def write_random_rows(n):
try:
letters = string.ascii_lowercase
for _ in range(n):
s = ''.join(random.choice(letters) for _ in range(100))
file_handle.write(s+"\n")
except Exception:
traceback.print_exc()
if __name__ == '__main__':
# initialise the multiprocessing pool
process_pool = multiprocessing.Pool(processes=process_count)
# write the rows
for i in range(process_count):
process_pool.apply_async(write_random_rows, (10000,))
# write_random_rows(10000)
# wait for the sub-processes to finish
process_pool.close()
process_pool.join()
As a result of running this, I expect the file to contain 30000 rows. If I run write_random_rows(10000)
inside my main loop (the commented out line in the above program), 30000 rows are written to the file as expected. However, if I run the non-commented line, process_pool.apply_async(write_random_rows, (10000,))
, I end up with 15498 rows in the file.
Strangely, no matter how many times I rerun this script, I always get the same (incorrect) number of rows in the output file.
I can fix this issue by initializing the file handle from within write_random_rows()
, i.e. within the sub-process execution, which suggests that somehow the inherited file handles are interfering with each other. If it was related to some kind of race condition though, I would expect the number of rows to change each time I ran the script. Why exactly does this issue occur?
This problem is due to a combination of:
Forking processes results in the parent and child sharing a posix file descriptor. In the presence of raw writes this should not result in data loss, but without any form of synchronisation between parent and child it always results in scrambled interleaving of data.
However in the presence of independent buffering by the processes, data may be lost depending on how the buffered write is implemented.
So ... a useful experiment in this case would involve replicating your problem with no buffering involved. This could be done in two ways:
using an open(..., mode='ab', buffering=0)
... and then as this is a binary file ensuring that all writes encode to bytes
using
file_handle.write(bytes(s+"\n", encoding="utf-8"))
Doing so results in a file with 30,000 lines of size 3030000 bytes (as expected)
jump through some hoops to open the file as as an io.TextIOWrapper
with non-default options that disable the buffering. We are unable to control the flags we need via open
so instead create it as:
file_handle = io.TextIOWrapper(
io.BufferedWriter(
io.FileIO("out.txt", mode="a"),
buffer_size=1),
newline='', encoding="utf-8",
write_through=True)
This will also result in a file of 30,000 lines of size 3030000 bytes (as expected)
On Python 3.7, as commenters have noted, the original code results in a file with 29,766 lines rather than 30,000. This is 78 lines short per worker. Running that code with two workers produces a file with 19,844 lines (which is also 78 lines short per worker).
Why? It is standard practice to exit a forked child process using os._exit
and it appears that this is not flushing the remaining buffer in each child to disk ... this explains the missing 78 lines per child exactly.
io.DEFAULT_BUFFER_SIZE
) is 8192 bytes.ceil(8192 / 101) = 82
lines. That is, 81 lines will almost fill the buffer and the 82nd line will cause the preceding 81 lines and itself to be flushed.10,000 % 82 = 78
lines remaining in the buffer in each child.Thus it would appear the missing data is buffered data that has not been flushed. So, making the following change:
def write_random_rows(n):
...
except Exception:
traceback.print_exc()
# flush the file
file_handle.flush()
will result in the desired 30,000 lines.
NOTE:
In either case, it is almost always better to ensure a child process is not sharing a file handle by either deferring the open to the child, or dup
'ing any open file handles across a fork.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With