同步异常

发布于 2024-08-27 03:48:56 字数 878 浏览 4 评论 0原文

我有两个线程,一个线程处理队列,另一个线程将内容添加到队列中。

  1. 我想在队列处理线程完成处理队列后将其置于睡眠状态
  2. 我想让第二个线程告诉它在向队列添加项目时唤醒

但是这些函数调用 System.Threading.SynchronizationLockException:对象同步方法是从 Monitor.PulseAll(waiting); 调用上的未同步代码块 调用的,因为我尚未将该函数与等待对象同步。 [我不想这样做,我希望能够在将项目添加到队列时进行处理]。我怎样才能实现这个目标?

Queue<object> items = new Queue<object>();
object waiting = new object();

第一个线程

public void ProcessQueue()
{
 while (true)
 {
   if (items.Count == 0)
     Monitor.Wait(waiting);

    object real = null;
    lock(items) {
    object item = items.Dequeue();
    real = item;
    }
    if(real == null)
        continue;
    .. bla bla bla
 } 
}

第二个线程涉及

public void AddItem(object o)
{
 ... bla bla bla
 lock(items)
 {
 items.Enqueue(o);
 }
 Monitor.PulseAll(waiting);
}

I have two threads, one thread processes a queue and the other thread adds stuff into the queue.

  1. I want to put the queue processing thread to sleep when its finished processing the queue
  2. I want to have the 2nd thread tell it to wake up when it has added an item to the queue

However these functions call System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of code on the Monitor.PulseAll(waiting); call, because I havent synchronized the function with the waiting object. [which I dont want to do, i want to be able to process while adding items to the queue]. How can I achieve this?

Queue<object> items = new Queue<object>();
object waiting = new object();

1st Thread

public void ProcessQueue()
{
 while (true)
 {
   if (items.Count == 0)
     Monitor.Wait(waiting);

    object real = null;
    lock(items) {
    object item = items.Dequeue();
    real = item;
    }
    if(real == null)
        continue;
    .. bla bla bla
 } 
}

2nd Thread involves

public void AddItem(object o)
{
 ... bla bla bla
 lock(items)
 {
 items.Enqueue(o);
 }
 Monitor.PulseAll(waiting);
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

谎言月老 2024-09-03 03:48:56

答案在您发布的错误消息中:
“从 Monitor.PulseAll(waiting) 上的未同步代码块调用对象同步方法;”

您必须从lock(waiting)块内部调用Monitor.PulseAll(waiting)。

另外...您还必须从锁块内调用 Monitor.Wait 。

The answer is in the error message you posted:
"Object synchronization method was called from an unsynchronized block of code on the Monitor.PulseAll(waiting);"

You have to call Monitor.PulseAll(waiting) from inside the lock(waiting) block.

Also... you have to call Monitor.Wait from within a lock block as well.

尬尬 2024-09-03 03:48:56

如果您可以访问.NET 4.0,则可以通过来实现您想要做的事情BlockingCollection
如果您想通过 Monitor 类自己完成此操作并使用 Pulse() 发出信号,那么您实际上走在正确的轨道上。
您会收到异常,因为调用 Wait()Pulse()PulseAll()您必须拥有指定对象的。您恰好在等待时错过了这个。

可以使用的示例基本线程安全队列:

  • 在消费者端使用 foreach
  • 在生产者端使用 while 或您最喜欢的条件构造,
  • 处理多个生产者/消费者,并且
  • 使用 lock()Monitor.Pulse()Monitor.PulseAll()Monitor.Wait() :

public class SignaledQueue<T>
{
    Queue<T> queue = new Queue<T>();
    volatile bool shutDown = false;

    public bool Enqueue(T item)
    {
        if (!shutDown)
        {
            lock (queue)
            {
                queue.Enqueue(item);
                //Pulse only if there can be waiters.
                if (queue.Count == 1)
                {
                    Monitor.PulseAll(queue);
                }
            }
            return true;
        }
        //Indicate that processing should stop.
        return false;
    }

    public IEnumerable<T> DequeueAll()
    {
        while (!shutDown)
        {
            do
            {
                T item;
                lock (queue)
                {
                    //If the queue is empty, wait.
                    if (queue.Count == 0)
                    {
                        if (shutDown) break;
                        Monitor.Wait(queue);
                        if (queue.Count == 0) break;
                    }
                    item = queue.Dequeue();
                }
                yield return item;
            } while (!shutDown);
        }
    }

    public void SignalShutDown()
    {
        shutDown = true;
        lock (queue)
        {
            //Signal all waiting consumers with PulseAll().
            Monitor.PulseAll(queue);
        }
    }

}

使用示例:

class Program
{
    static void Main(string[] args)
    {
        int numProducers = 4, numConsumers = 2;
        SignaledQueue<int> queue = new SignaledQueue<int>();

        ParameterizedThreadStart produce = delegate(object obj)
        {
            Random rng = new Random((int)obj);
            int num = 0;
            while (queue.Enqueue(++num))
            {
                Thread.Sleep(rng.Next(100));
            } 
        };

        ThreadStart consume = delegate
        {
            foreach (int num in queue.DequeueAll())
            {
                Console.Write(" {0}", num);
            }
        };

        Random seedRng = new Random();
        for (int i = 0; i < numProducers; i++)
        {
            new Thread(produce).Start(seedRng.Next());
        }

        for (int i = 0; i < numConsumers; i++)
        {
            new Thread(consume).Start();
        }

        Console.ReadKey(true);
        queue.SignalShutDown();

    }
}

If you have access to .NET 4.0, what you want to do can be achieved by BlockingCollection<T>.
If you want to do it yourself by means of the Monitor class and signaling with Pulse(), you are actually on the right track.
You get the exception because to call Wait(), Pulse() and PulseAll(), you have to own the lock on the specified object. You happen to miss this on waiting.

A sample basic thread-safe queue that can be used:

  • with foreach on the consumer,
  • with while or your favorite conditional construct on the producer side,
  • handles multiple producers/consumers and
  • uses lock(), Monitor.Pulse(), Monitor.PulseAll() and Monitor.Wait():

.

public class SignaledQueue<T>
{
    Queue<T> queue = new Queue<T>();
    volatile bool shutDown = false;

    public bool Enqueue(T item)
    {
        if (!shutDown)
        {
            lock (queue)
            {
                queue.Enqueue(item);
                //Pulse only if there can be waiters.
                if (queue.Count == 1)
                {
                    Monitor.PulseAll(queue);
                }
            }
            return true;
        }
        //Indicate that processing should stop.
        return false;
    }

    public IEnumerable<T> DequeueAll()
    {
        while (!shutDown)
        {
            do
            {
                T item;
                lock (queue)
                {
                    //If the queue is empty, wait.
                    if (queue.Count == 0)
                    {
                        if (shutDown) break;
                        Monitor.Wait(queue);
                        if (queue.Count == 0) break;
                    }
                    item = queue.Dequeue();
                }
                yield return item;
            } while (!shutDown);
        }
    }

    public void SignalShutDown()
    {
        shutDown = true;
        lock (queue)
        {
            //Signal all waiting consumers with PulseAll().
            Monitor.PulseAll(queue);
        }
    }

}

Sample usage:

class Program
{
    static void Main(string[] args)
    {
        int numProducers = 4, numConsumers = 2;
        SignaledQueue<int> queue = new SignaledQueue<int>();

        ParameterizedThreadStart produce = delegate(object obj)
        {
            Random rng = new Random((int)obj);
            int num = 0;
            while (queue.Enqueue(++num))
            {
                Thread.Sleep(rng.Next(100));
            } 
        };

        ThreadStart consume = delegate
        {
            foreach (int num in queue.DequeueAll())
            {
                Console.Write(" {0}", num);
            }
        };

        Random seedRng = new Random();
        for (int i = 0; i < numProducers; i++)
        {
            new Thread(produce).Start(seedRng.Next());
        }

        for (int i = 0; i < numConsumers; i++)
        {
            new Thread(consume).Start();
        }

        Console.ReadKey(true);
        queue.SignalShutDown();

    }
}
街道布景 2024-09-03 03:48:56

我更喜欢使用回调来启动一个处理线程,该线程会继续运行直到被捕获,并且锁定会导致同时读取器和写入器排队等待:

public delegate void CallbackDelegate();

class Program
{
    static void Main(string[] args)
    {
        Queue<object> items = new Queue<object>();

        Processor processor = new Processor(items);
        Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack));

        Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem));
        object objectToAdd = new object();
        addThread.Start(objectToAdd);
    }
}

class Processor
{
    Queue<object> items;

    public Processor(Queue<object> itemsArg)
    {
        items = itemsArg;
    }

    public void ProcessQueue()
    {
        lock (items)
        {
            while (items.Count > 0)
            {
                object real = items.Dequeue();
                // process real
            }
        }
    }

    public void CallBack()
    {
        Thread processThread = new Thread(ProcessQueue);
        processThread.IsBackground = true;
        processThread.Start();
    }
}

class Adder
{
    Queue<object> items;
    CallbackDelegate callback;

    public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg)
    {
        items = itemsArg;
        callback = callbackArg;
    }

    public void AddItem(object o)
    {
        lock (items) { items.Enqueue(o); }
        callback();
    }
}

I prefer to use a callback that launches a processing thread that continues until it's caught up, with locks causing simultaneous readers and writers to wait in line:

public delegate void CallbackDelegate();

class Program
{
    static void Main(string[] args)
    {
        Queue<object> items = new Queue<object>();

        Processor processor = new Processor(items);
        Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack));

        Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem));
        object objectToAdd = new object();
        addThread.Start(objectToAdd);
    }
}

class Processor
{
    Queue<object> items;

    public Processor(Queue<object> itemsArg)
    {
        items = itemsArg;
    }

    public void ProcessQueue()
    {
        lock (items)
        {
            while (items.Count > 0)
            {
                object real = items.Dequeue();
                // process real
            }
        }
    }

    public void CallBack()
    {
        Thread processThread = new Thread(ProcessQueue);
        processThread.IsBackground = true;
        processThread.Start();
    }
}

class Adder
{
    Queue<object> items;
    CallbackDelegate callback;

    public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg)
    {
        items = itemsArg;
        callback = callbackArg;
    }

    public void AddItem(object o)
    {
        lock (items) { items.Enqueue(o); }
        callback();
    }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文