0MQ:如何以线程安全的方式使用ZeroMQ?

发布于 2024-11-04 03:11:58 字数 1997 浏览 0 评论 0原文

我阅读了 ZeroMq 指南,我偶然发现了以下内容:

您不得在之间共享 ØMQ 套接字 线程。 ØMQ套接字不是 线程安全。技术上是可以的 做到这一点,但它需要信号量, 锁或互斥锁。这将使您的 应用程序缓慢且脆弱。唯一的 一个远程理智的地方 线程之间共享套接字位于 需要做的语言绑定 像垃圾收集一样神奇 套接字。

以及稍后:

记住:不要使用或关闭套接字,除非在创建它们的线程中。

我还了解 ZeroMQ Context 是线程安全的。

如果一个类注册另一个类的事件,在 .Net 中,该事件可能会从与创建侦听器的线程不同的线程调用。

我认为只有两个选项可以通过 ZeroMQ-Sockets 从事件处理程序中分派某些内容:

  • 将事件处理程序调用线程同步到在
  • 创建新 ZeroMQ 中创建 ZeroMQ-Socket 的线程-Socket / 通过使用线程安全的 ZeroMQ-Context 获取事件处理程序中线程的现有 ZeroMQ-Socket

0MQ-Guide 阻止第一个,我不认为为每个线程创建一个新的 ZeroMq-Socket 是高性能/可行的方法。

我的问题
从事件处理程序中通过 0MQ 发布消息的正确模式(本来的方式)是什么?

另外,该指南的作者在写下以下内容时是否考虑到了 .Net 的 ZeroMQ-Binding:

唯一的 一个远程理智的地方 线程之间共享套接字位于 需要做的语言绑定 像垃圾收集一样神奇 插座。 ?

这是一些示例代码来强调我的问题:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}

I read the ZeroMq guide and I stumbled upon the following:

You MUST NOT share ØMQ sockets between
threads. ØMQ sockets are not
threadsafe. Technically it's possible
to do this, but it demands semaphores,
locks, or mutexes. This will make your
application slow and fragile. The only
place where it's remotely sane to
share sockets between threads are in
language bindings that need to do
magic like garbage collection on
sockets.

and later on:

Remember: Do not use or close sockets except in the thread that created them.

I also understood that the ZeroMQ Context is threadsafe.

If a class registers for an event of a another class, in .Net, this event might be invoked from a different thread than the thread the listener was created on.

I think there are only two options to be able to dispatch something via ZeroMQ-Sockets from within an eventhandler:

  • Synchronize the eventhandler-invoking-thread to the thread the ZeroMQ-Socket was created in
  • Create a new ZeroMQ-Socket / get the exisiting ZeroMQ-Socket for the thread within the eventhandler by using the threadsafe ZeroMQ-Context

It seems that the 0MQ-Guide to discourage the first one and I don't think that creating a new ZeroMq-Socket for each thread is performant / the way to go.

My Question:
What is the correct pattern (the way it is meant to be) to publish messages via 0MQ from within an eventhandler?

Also, did the authors of the guide have the ZeroMQ-Binding for .Net in mind when they wrote:

The only
place where it's remotely sane to
share sockets between threads are in
language bindings that need to do
magic like garbage collection on
sockets. ?

Here is some samplecode to emphasize my problem/question:

public class ExampleClass
{
    public event EventHandler<ByteEventArgs> SomethinIsCalledFromAnotherThread;
}

public class ByteEventArgs : EventArgs
{
    public byte[] BytesToSend;
}


public class Dispatcher
{
    ZMQ.Context ctx;

    public Dispatcher(ZMQ.Context mqcontext, ExampleClass exampleClassInstance)
    {
        this.ctx = mqcontext;
        exampleClassInstance.SomethinIsCalledFromAnotherThread += new EventHandler<ByteEventArgs>(exampleClass_SomethinIsCalledFromAnotherThread);
    }

    void exampleClass_SomethinIsCalledFromAnotherThread(object sender, ByteEventArgs e)
    {
        // this method might be called by a different thread. So I have to get a new socket etc?
        using (var socket = ctx.Socket(ZMQ.SocketType.PUSH))
        {
            // init socket etc..... and finally: 
            socket.Send(e.BytesToSend);
        }
        // isn't that too much overhead?
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

赏烟花じ飞满天 2024-11-11 03:11:58

您可以创建大量 0MQ 套接字,当然数量与线程数量一样多。如果您在一个线程中创建套接字,并在另一个线程中使用它,则必须在两个操作之间执行完整的内存屏障。任何其他情况都会导致 libzmq 中奇怪的随机失败,因为套接字对象不是线程安全的。

有一些传统模式,但我不知道它们如何专门映射到 .NET:

  1. 在使用它们的线程中创建套接字,期间。在紧密绑定到一个进程的线程之间共享上下文,并在不紧密绑定的线程中创建单独的内容。在高级 C API (czmq) 中,这些线程称为附加线程和分离线程。
  2. 在父线程中创建套接字,并在线程创建时将套接字传递给附加线程。线程创建调用将执行完整的内存屏障。从那时起,在子线程中使用套接字。 “使用”意味着接收、发送、setsockopt、getsockopt 和关闭。
  3. 在一个线程中创建一个套接字,然后在另一个线程中使用,在每次使用之间执行您自己的完整内存屏障。这是非常微妙的,如果您不知道什么是“完整内存屏障”,则不应该这样做。

You can create lots of 0MQ sockets, certainly as many as you have threads. If you create a socket in one thread, and use it in another, you must execute a full memory barrier between the two operations. Anything else will result in weird random failures in libzmq, as socket objects are not threadsafe.

There are a few conventional patterns, though I don't know how these map specifically to .NET:

  1. Create sockets in the threads that use them, period. Share contexts between threads that are tightly bound into one process, and create separate contents in threads that are not tightly bound. In the high-level C API (czmq) these are called attached and detached threads.
  2. Create a socket in a parent thread and pass at thread creation time to an attached thread. The thread creation call will execute a full memory barrier. From then on, use the socket only in the child thread. "use" means recv, send, setsockopt, getsockopt, and close.
  3. Create a socket in one thread, and use in another, executing your own full memory barrier between each use. This is extremely delicate and if you don't know what a "full memory barrier" is, you should not be doing this.
海之角 2024-11-11 03:11:58

在.net Framework v4及更高版本中,您可以使用并发收集来解决这个问题。即生产者-消费者模式。多个线程(处理程序)可以将数据发送到线程安全队列,只有单个线程使用队列中的数据并使用套接字发送数据。

这个想法是这样的:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;

In .net framework v4 and up you can use concurrent collection to solve this problem. Namely Producer-Consumer pattern. Multiple threads (handlers) can send data to a thread-safe queue and just single thread consumes data from the queue and sends it using the socket.

Here is the idea:

sendQueue = new BlockingCollection<MyStuff>(new ConcurrentQueue<MyStuff>());
// concurrent queue can accept from multiple threads/handlers safely
MyHandler += (MyStuff stuffToSend) => sendQueue.Add(stuffToSend);

// start single-threaded data send loop
Task.Factory.StartNew(() => {
    using(var socket = context.Socket()) {
        MyStuff stuffToSend;
        // this enumerable will be blocking until CompleteAdding is called
        foreach(var stuff in sendQueue.GetConsumingEnumerable())
            socket.Send(stuff.Serialize());
    }
});

// break out of the send loop when done
OnMyAppExit += sendQueue.CompleteAdding;
何处潇湘 2024-11-11 03:11:58

不要忘记查看 inproc 传输。使用 inproc:// 套接字进行线程间通信并让一个线程打开套接字与其他进程/服务器通信可能会很有用。

每个线程仍然需要至少一个套接字,但 inproc 根本不涉及 IP 网络层。

Don't forget to have a look at the inproc transport. It might be useful to use inproc:// sockets for interthread communication and have one thread that opens sockets to talk to other processes/servers.

You still need at least one socket per thread, but the inproc ones do not involve the IP network layer at all.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文