itertools.tee() 的结果是线程安全的(Python)
假设我有这样的Python代码:
from itertools import count, tee
original = count() # just an example, can be another iterable
a, b = tee(original)
问题是,如果我开始在一个线程中迭代“a”,同时在另一个线程中迭代“b”,会出现任何问题吗?显然,a 和 b 共享一些数据(原始的可迭代数据,+一些额外的东西,内部缓冲区或其他东西)。那么,a.next() 和 b.next() 在访问此共享数据时会执行适当的锁定吗?
Suppose I have this Python code:
from itertools import count, tee
original = count() # just an example, can be another iterable
a, b = tee(original)
The question is, will there be any problem if I start iterating "a" in one thread and, at the same time, iterating "b" in another thread? Clearly, a and b share some data (the original iterable, + some additional stuff, internal buffers or something). So, will a.next() and b.next() do the appropriate locking when they access this shared data?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(5)
更新! 由 tee 引起的段错误已在 python 2.7、3.7、3.8 及以上版本的最新版本中得到修复。为了线程安全,你仍然需要自己管理并发访问,你可以使用我下面的解决方案。
tl;dr
In CPython, `itertools.tee` is thread safe *if and only if* the original iterator is implemented in C/C++, i.e. doesn't use **any** python.
如果原始迭代器
it
是用 Python 编写的,例如类实例或生成器,则itertools.tee(it)
是不是 线程安全。在最好的情况下,你只会得到一个异常(你会的),而在最坏的情况下,Python 会崩溃。这里没有使用
tee
,而是一个线程安全的包装类和函数:我现在将(大量)扩展
tee
何时是线程以及何时不是线程-安全,以及为什么。可以的示例
Let's run some code (this is python 3 code, for python 2 use `itertools.izip` instead of `zip` to have same behavior):
itertools.count 完全用 C 语言编写在 CPython 项目的 Modules/itertoolsmodule.c 文件中,因此它工作得很好。
对于列表、元组、集合、范围、字典(键、值和项)、
collections.defaultdict
(键、值和项)以及其他一些也是如此。它不起作用的示例 - 生成器
A very short example is using a generator:
是的,tee 是用 C 编写的,并且 GIL 确实一次执行一个字节代码。但上面的例子表明,这不足以保证线程安全。沿着这条线的某个地方发生了这样的事情:
next
次数相同,next(a)
,next(gen)
,gen
是用 python 编写的。比如说,gen.__next__
的第一个字节码CPython决定切换线程,next(b)
,next(gen)
gen.__next__
已经在线程 1 中运行,我们得到一个异常。不起作用的示例 - 迭代器对象
Ok, maybe it's just not thread-safe to use generators inside `tee`. Then we run a variant of the above code that uses an iterator object:
上面的代码在 Ubuntu、Windows 7 和 OSX 上的 python 2.7.13 和 3.6(可能还有所有 cpython 版本)中崩溃。我现在还不想透露原因,先一步。
如果我在迭代器中使用锁会怎样?
Maybe the above code crashes because our iterator itself was not thread-safe. Let's add a lock and see what happens:
在迭代器中添加锁不足以使
tee
线程安全。为什么tee不是线程安全的
问题的症结在于CPython的
Modules/itertoolsmodule.c
文件中teedataobject
的getitem
方法。tee
的实现非常酷,通过优化可以节省 RAM 调用:tee
返回“tee 对象”,每个对象都保存对头teedataobject 的引用
。这些反过来就像链表中的链接,但不是保存单个元素 - 它们保存 57。这对于我们的目的来说并不重要,但它就是这样。以下是teedataobject
的getitem
函数:当请求一个元素时,
teedataobject
检查是否已准备好一个元素。如果是,则返回它。如果没有,那么它会在原始迭代器上调用next
。如果迭代器是用 python 编写的,那么代码可能会挂起。所以问题是这样的:next
的次数相同,next(a)
,并且 C 代码到达PyIter_Next< /code> 上面调用。比如说,在
next(gen)
的第一个字节码上,CPython 决定切换线程。next(b)
,由于它仍然需要一个新元素,因此 C 代码进入PyIter_Next
调用,此时两个线程处于同一位置,
i
和tdo->numread
具有相同的值。请注意,tdo->numread 只是一个变量,用于跟踪 57 个单元格链接中teedataobject
应写入下一个的位置。线程 2 完成对
PyIter_Next
的调用并返回一个元素。在某个时刻,CPython 决定再次切换线程,线程 1 恢复,完成对
PyIter_Next
的调用,然后运行以下两行:但是线程 2 已经设置了
tdo->values[i]
!< /p>这已经足以表明
tee
不是线程安全的,因为我们丢失了线程 2 放入tdo->values[i]
中的值。但这并不能解释崩溃的原因。假设
i
为 56。由于两个线程都调用tdo->numread++
,它现在达到 58 - 高于 57,即tdo->values 的分配大小
。线程 1 也继续移动后,对象 tdo 不再有引用,可以被删除了。这是teedataobject
的清除函数:在标记为“PROBLEM”的行,CPython 将尝试清除
tdo->values[57]
。这就是崩溃发生的地方。嗯,有些时候。崩溃的地方不止一处,我只是想展示一个。现在您知道了 -
itertools.tee
不是线程安全的。一种解决方案 - 外部锁
我们可以在
tee.__next__
周围放置一个锁,而不是在迭代器的__next__
内部锁定。这意味着整个 teedataobject.__getitem__ 方法每次都会由单个线程调用。我在这个答案的开头给出了一个简短的实现。它是线程安全的tee
的直接替代品。它唯一没有实现tee
所做的事情 - 是酸洗。由于锁不可被picklable,所以添加它并不简单。但是,当然,这是可以做到的。Update! segfaults caused by tee have been fixed in late versions of python 2.7, 3.7, 3.8, and anything above. You still need to manage concurrent access yourself for thread safety, and you can use my solution below.
tl;dr
In CPython, `itertools.tee` is thread safe *if and only if* the original iterator is implemented in C/C++, i.e. doesn't use **any** python.
If an original iterator
it
was written in python, like a class instance or a generator, thenitertools.tee(it)
is not thread-safe. In the best case scenario you'll only get an exception (which you will), and in the worst python will crash.Instead of using
tee
, here is a wrapper class and function that are thread-safe:I'll now expand (a lot) on when
tee
is and isn't thread-safe, and why.Example where it's ok
Let's run some code (this is python 3 code, for python 2 use `itertools.izip` instead of `zip` to have same behavior):
itertools.count is written entirely in C in the file
Modules/itertoolsmodule.c
of the CPython project, so it works just fine.The same is true for: lists, tuples, sets, range, dictionaries (keys, values and items),
collections.defaultdict
(keys, values and items), and a few others.Example where it doesn't work - Generators
A very short example is using a generator:
Yes,
tee
is written in C, and it is true that that GIL executes one byte code at a time. But the above example shows that this is not enough to ensure thread safety. Somewhere along the line this is what happened:next
on their tee_object instances the same amount of times,next(a)
,next(gen)
,gen
is written in python. On, say, the first byte code ofgen.__next__
CPython decides to switch threads,next(b)
,next(gen)
gen.__next__
is already running in thread 1, we get an exception.Example where it doesn't work - Iterator object
Ok, maybe it's just not thread-safe to use generators inside `tee`. Then we run a variant of the above code that uses an iterator object:
The above code crashes in python 2.7.13 and 3.6 (and probably all cpython versions), on Ubuntu, Windows 7 and OSX. I don't want to reveal the reason just yet, one more step before.
What if I use locks inside my iterator?
Maybe the above code crashes because our iterator itself was not thread-safe. Let's add a lock and see what happens:
Adding a lock inside our iterator is not enough to make
tee
thread-safe.Why tee is not thread-safe
The crux of the matter is the
getitem
method ofteedataobject
in the fileModules/itertoolsmodule.c
of CPython. The implementation oftee
is really cool, with an optimization that saves RAM calls:tee
returns "tee objects", each of which saves a reference to a headteedataobject
. These in turn are like links in a linked list, but instead of holding a single element - they hold 57. This isn't really important for our purposes, but it is what it is. Here is thegetitem
function ofteedataobject
:When asked for an element,
teedataobject
checks if it has one prepared. If it does then it returns it. If it doesn't then it callsnext
on the original iterator. This is where, if the iterator is written in python, the code can hang. So here's the problem:next
the same amount of times,next(a)
, and the C code gets to thePyIter_Next
call above. On, say, the first byte code ofnext(gen)
, CPython decides to switch threads.next(b)
, and since it still needs a new element, the C code gets to thePyIter_Next
call,At this point both threads are in the same place, with the same values for
i
andtdo->numread
. Note thattdo->numread
is simply a variable to keep track of where in the 57-cells link theteedataobject
should write to next.Thread 2 finishes its call to
PyIter_Next
and returns an element. At some point CPython decides to switch threads again,Thread 1 resumes, finishes its call to
PyIter_Next
, and then runs the two lines:But thread 2 has already set
tdo->values[i]
!This is already enough to show that
tee
is not thread-safe since we lose the value that thread 2 put intdo->values[i]
. But this doesn't explain the crashing.Say
i
was 56. Since both threads calltdo->numread++
, it now gets to 58 - above 57, the allocated size oftdo->values
. After thread 1 moves on as well, the objecttdo
has no more references and is ready to be deleted. This is the clear function forteedataobject
:At the line marked "PROBLEM", CPython will try to clear
tdo->values[57]
. This is where the crash happens. Well, some of the times. There's more than one place for crashing, I just wanted to show one.Now you know -
itertools.tee
is not thread safe.One solution - External lock
Instead of locking inside our iterator's
__next__
, we can put a lock aroundtee.__next__
. This means that the entireteedataobject.__getitem__
method will be called by a single thread each time. I gave a short implementation at the beginning of this answer. It is a drop-in replacement fortee
that is thread-safe. The only thing it doesn't implement whichtee
does - is pickling. Since locks aren't picklable, it's not trivial to add this. But, of course, it can be done.如果文档中显示等效代码,请参见:
是正确的,那么不,它不会是线程安全的。
请注意,尽管 deque 被记录为具有线程安全的追加和弹出,它不对使用它的代码做出任何保证。
由于主代码最终可能会向底层迭代器询问多个线程上的元素,因此您需要有一个线程安全的集合和迭代器作为输入,以便 tee 安全。
If the equivalent code shown in the documentation, here:
is correct, then no, it will not be thread-safe.
Note that though deque is documented to have thread-safe append and pop, it does not make any guarantees for the code that uses it.
Since the main code could end up asking the underlying iterator for elements on multiple threads, you need to have a thread-safe collection and iterator as input in order for tee to be safe.
您可以使用此解决方法使 tee 线程安全:
You can use this workaround to make tee thread-safe:
在 C-Python 中,itertools.tee() 及其返回的迭代器是用 C 代码实现的。这意味着 GIL 应该保护它不被多个线程同时调用。它可能会正常工作,并且不会使解释器崩溃,但不能保证线程安全。
简而言之,不要冒险。
In C-Python,
itertools.tee()
and the iterator it returns are implemented with C code. That means the GIL should protected it against being called simultaneously by multiple threads. It will probably work correctly, and it won't crash the interpreter, but it is not guaranteed to be thread safe.Simply put, don't take the risk.
我想分享一下我在 Python 3.6.9 和 3.7.4 环境中使用 itertools.tee 将大尺寸 plat 文本文件拆分为来自/到 s3 的多个 csv 文件的经验。
我的数据流来自s3 zipfile,s3fs读取iter,用于数据类转换的map iter,tee iter,用于数据类过滤器的map iter,循环iter并捕获数据并使用s3fs write和/或本地写入以csv格式写入s3和s3fs 放入 s3。
itertools.tee 在 zipfile 进程堆栈上失败。
上面,Dror Speiser,safetee 工作正常,但是由于数据集分布不好或处理延迟,tee 对象之间的任何不平衡都会导致内存使用量增加。
此外,它无法正常使用多处理日志记录,可能与此错误有关:https://bugs.python。 org/issue34410
下面的代码只是在 tee 对象之间添加简单的流程控制,以防止内存增加和 OOM Killer 情况。
希望对以后的参考有所帮助。
I would like to share my experience when using itertools.tee to split a large size plat text file into multiple csv files from/to s3 at Python 3.6.9 and 3.7.4 environment.
My data flow are from s3 zipfile, s3fs read iter, map iter for dataclass transform, tee iter, map iter for dataclass filter, loop over the iter and capture data and write to s3 in csv format with s3fs write and/or local write and s3fs put to s3.
The itertools.tee was failed on the zipfile process stack.
Above, Dror Speiser, safetee worked fine, but memory usage increased for any unbalances between tee object as dataset not good distribution or processing delays.
Also, it was not properly work with multiprocessing-logging, might be related this bug: https://bugs.python.org/issue34410
Below code is just to add simple flow control in between tee object to prevent memory increment and OOM Killer situation.
Hope to be helpful for future reference.