Suppose I have this Python code:
from itertools import count, tee
original = count() # just an example, can be another iterable
a, b = tee(original)
The question is, will there be any problem if I start iterating "a" in one thread and, at the same time, iterating "b" in another thread? Clearly, a and b share some data (the original iterable, + some additional stuff, internal buffers or something). So, will a.next() and b.next() do the appropriate locking when they access this shared data?
tee() function This iterator splits the container into a number of iterators mentioned in the argument. Parameter: This method contains two arguments, the first argument is iterator and the second argument is a integer. Return Value: This method returns the number of iterators mentioned in the argument.
Iterators are still not threadsafe. The solution to this iteration problem will be to acquire the collection's lock when you need to iterate over it, which we'll talk about in a future reading.
While this is a perfectly fine approach, it is important to remember that utilizing the itertools iterators means using iterators that are Pythonic implementations of iterators elsewhere. That being said, the iterators from itertools are often significantly faster than regular iteration from a standard Python for loop.
islice (iterable, start, stop[, step]) Make an iterator that returns selected elements from the iterable. If start is non-zero, then elements from the iterable are skipped until start is reached. Afterward, elements are returned consecutively unless step is set higher than one which results in items being skipped.
Update! segfaults caused by tee have been fixed in late versions of python 2.7, 3.7, 3.8, and anything above. You still need to manage concurrent access yourself for thread safety, and you can use my solution below.
If an original iterator it
was written in python, like a class instance or a generator, then itertools.tee(it)
is not thread-safe. In the best case scenario you'll only get an exception (which you will), and in the worst python will crash.
Instead of using tee
, here is a wrapper class and function that are thread-safe:
class safeteeobject(object):
"""tee object wrapped to make it thread-safe"""
def __init__(self, teeobj, lock):
self.teeobj = teeobj
self.lock = lock
def __iter__(self):
return self
def __next__(self):
with self.lock:
return next(self.teeobj)
def __copy__(self):
return safeteeobject(self.teeobj.__copy__(), self.lock)
def safetee(iterable, n=2):
"""tuple of n independent thread-safe iterators"""
lock = Lock()
return tuple(safeteeobject(teeobj, lock) for teeobj in tee(iterable, n))
I'll now expand (a lot) on when tee
is and isn't thread-safe, and why.
>>> from itertools import tee, count
>>> from threading import Thread
>>> def limited_sum(it):
... s = 0
... for elem, _ in zip(it, range(1000000)):
... s += elem
... print(elem)
>>> a, b = tee(count())
>>> [Thread(target=limited_sum, args=(it,)).start() for it in [a, b]]
# prints 499999500000 twice, which is in fact the same 1+...+999999
itertools.count is written entirely in C in the file Modules/itertoolsmodule.c
of the CPython project, so it works just fine.
The same is true for: lists, tuples, sets, range, dictionaries (keys, values and items), collections.defaultdict
(keys, values and items), and a few others.
>>> gen = (i for i in range(1000000))
>>> a, b = tee(gen)
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Exception in thread Thread-10:
Traceback (most recent call last):
File "/usr/lib/python3.4/threading.py", line 920, in _bootstrap_inner
self.run()
File "/usr/lib/python3.4/threading.py", line 868, in run
self._target(*self._args, **self._kwargs)
ValueError: generator already executing
Yes, tee
is written in C, and it is true that that GIL executes one byte code at a time. But the above example shows that this is not enough to ensure thread safety. Somewhere along the line this is what happened:
next
on their tee_object instances the same amount of times,next(a)
,next(gen)
,gen
is written in python. On, say, the first byte code of gen.__next__
CPython decides to switch threads,next(b)
,next(gen)
gen.__next__
is already running in thread 1, we get an exception.>>> from itertools import tee
>>> from threading import Thread
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... def __iter__(self):
... return self
... def __next__(self):
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
The above code crashes in python 2.7.13 and 3.6 (and probably all cpython versions), on Ubuntu, Windows 7 and OSX. I don't want to reveal the reason just yet, one more step before.
>>> from itertools import tee
>>> from threading import Thread, Lock
>>> class countdown(object):
... def __init__(self, n):
... self.i = n
... self.lock = Lock()
... def __iter__(self):
... return self
... def __next__(self):
... with self.lock:
... self.i -= 1
... if self.i < 0:
... raise StopIteration
... return self.i
...
>>> a, b = tee(countdown(100000))
>>> [Thread(target=sum, args=(it,)).start() for it in [a, b]]
Segmentation fault (core dumped)
Adding a lock inside our iterator is not enough to make tee
thread-safe.
The crux of the matter is the getitem
method of teedataobject
in the file Modules/itertoolsmodule.c
of CPython. The implementation of tee
is really cool, with an optimization that saves RAM calls: tee
returns "tee objects", each of which saves a reference to a head teedataobject
. These in turn are like links in a linked list, but instead of holding a single element - they hold 57. This isn't really important for our purposes, but it is what it is. Here is the getitem
function of teedataobject
:
static PyObject *
teedataobject_getitem(teedataobject *tdo, int i)
{
PyObject *value;
assert(i < LINKCELLS);
if (i < tdo->numread)
value = tdo->values[i];
else {
/* this is the lead iterator, so fetch more data */
assert(i == tdo->numread);
value = PyIter_Next(tdo->it);
if (value == NULL)
return NULL;
tdo->numread++;
tdo->values[i] = value;
}
Py_INCREF(value);
return value;
}
When asked for an element, teedataobject
checks if it has one prepared. If it does then it returns it. If it doesn't then it calls next
on the original iterator. This is where, if the iterator is written in python, the code can hang. So here's the problem:
next
the same amount of times,next(a)
, and the C code gets to the PyIter_Next
call above. On, say, the first byte code of next(gen)
, CPython decides to switch threads.next(b)
, and since it still needs a new element, the C code gets to the PyIter_Next
call,At this point both threads are in the same place, with the same values for i
and tdo->numread
. Note that tdo->numread
is simply a variable to keep track of where in the 57-cells link the teedataobject
should write to next.
Thread 2 finishes its call to PyIter_Next
and returns an element. At some point CPython decides to switch threads again,
Thread 1 resumes, finishes its call to PyIter_Next
, and then runs the two lines:
tdo->numread++;
tdo->values[i] = value;
But thread 2 has already set tdo->values[i]
!
This is already enough to show that tee
is not thread-safe since we lose the value that thread 2 put in tdo->values[i]
. But this doesn't explain the crashing.
Say i
was 56. Since both threads call tdo->numread++
, it now gets to 58 - above 57, the allocated size of tdo->values
. After thread 1 moves on as well, the object tdo
has no more references and is ready to be deleted. This is the clear function for teedataobject
:
static int
teedataobject_clear(teedataobject *tdo)
{
int i;
PyObject *tmp;
Py_CLEAR(tdo->it);
for (i=0 ; i<tdo->numread ; i++)
Py_CLEAR(tdo->values[i]); // <----- PROBLEM!!!
tmp = tdo->nextlink;
tdo->nextlink = NULL;
teedataobject_safe_decref(tmp);
return 0;
}
At the line marked "PROBLEM", CPython will try to clear tdo->values[57]
. This is where the crash happens. Well, some of the times. There's more than one place for crashing, I just wanted to show one.
Now you know - itertools.tee
is not thread safe.
Instead of locking inside our iterator's __next__
, we can put a lock around tee.__next__
. This means that the entire teedataobject.__getitem__
method will be called by a single thread each time. I gave a short implementation at the beginning of this answer. It is a drop-in replacement for tee
that is thread-safe. The only thing it doesn't implement which tee
does - is pickling. Since locks aren't picklable, it's not trivial to add this. But, of course, it can be done.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With