生产者/消费者,流缓冲区问题
我正在尝试编写一个管理 3 个流的缓冲区管理器。典型的用法是使用慢速生产者和快速消费者。三个缓冲区背后的想法是生产者始终有一个缓冲区可供写入,而消费者始终获取生成的最新数据。
现在我已经有了这个,并且可以正常工作。
namespace YariIfStream
{
/// <summary>
/// A class that manages three buffers used for IF data streams
/// </summary>
public class YariIFStream
{
private Stream writebuf; ///<value>The stream used for writing</value>
private Stream readbuf; ///<value>The stream used for reading</value>
private Stream swapbuf; ///<value>The stream used for swapping</value>
private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
private Object sync; ///<value>Object used for syncing</value>
/// <summary>
/// Initializes a new instance of the Yari.YariIFStream class with expandable buffers
/// </summary>
public YariIFStream()
{
sync = new Object();
eerste = true;
writebuf = new MemoryStream();
readbuf = new MemoryStream();
swapbuf = new MemoryStream();
}
/// <summary>
/// Returns the stream with the buffer with new data ready to be read
/// </summary>
/// <returns>Stream</returns>
public Stream GetReadBuffer()
{
lock (sync)
{
Monitor.Wait(sync);
Stream tempbuf = swapbuf;
swapbuf = readbuf;
readbuf = tempbuf;
}
return readbuf;
}
/// <summary>
/// Returns the stream with the buffer ready to be written with data
/// </summary>
/// <returns>Stream</returns>
public Stream GetWriteBuffer()
{
lock (sync)
{
Stream tempbuf = swapbuf;
swapbuf = writebuf;
writebuf = tempbuf;
if (!firsttime)
{
Monitor.Pulse(sync);
}
else
{
firsttime = false;
}
}
//Thread.Sleep(1);
return writebuf;
}
}
}
使用首次检查是因为第一次询问写入缓冲区时,它无法向消费者发出脉冲,因为缓冲区仍然必须写入数据。当第二次询问写入缓冲区时,我们可以确定前一个缓冲区包含数据。
我有两个线程,一个生产者和一个消费者。 这是我的输出:
prod: uv_hjd`alv cons: N/<]g[)8fV
prod: N/<]g[)8fV cons: 5Ud*tJ-Qkv
prod: 5Ud*tJ-Qkv cons: 4Lx&Z7qqjA
prod: 4Lx&Z7qqjA cons: kjUuVyCa.B
prod: kjUuVyCa.B
现在消费者落后一档没关系,它应该这样做。 如您所见,我丢失了第一串数据,这是我的主要问题。
其他问题是:
- 如果我删除第一次检查,它就会起作用。但我认为不应该......
- 如果我添加 Thread.Sleep(1);在 GetWriteBuffer() 中它也可以工作。有件事我不明白。
预先感谢您的任何启发。
I'm trying to write a buffermanager that manages 3 Streams. The typical usage would be with a slow producer and a fast consumer. The idea behind the three buffers is that the producer ALWAYS has a buffer to write in and the consumer ALWAYS gets the latest data produced.
Now i already have this, and it sort-off works.
namespace YariIfStream
{
/// <summary>
/// A class that manages three buffers used for IF data streams
/// </summary>
public class YariIFStream
{
private Stream writebuf; ///<value>The stream used for writing</value>
private Stream readbuf; ///<value>The stream used for reading</value>
private Stream swapbuf; ///<value>The stream used for swapping</value>
private bool firsttime; ///<value>Boolean used for checking if it is the first time a writebuffers is asked</value>
private Object sync; ///<value>Object used for syncing</value>
/// <summary>
/// Initializes a new instance of the Yari.YariIFStream class with expandable buffers
/// </summary>
public YariIFStream()
{
sync = new Object();
eerste = true;
writebuf = new MemoryStream();
readbuf = new MemoryStream();
swapbuf = new MemoryStream();
}
/// <summary>
/// Returns the stream with the buffer with new data ready to be read
/// </summary>
/// <returns>Stream</returns>
public Stream GetReadBuffer()
{
lock (sync)
{
Monitor.Wait(sync);
Stream tempbuf = swapbuf;
swapbuf = readbuf;
readbuf = tempbuf;
}
return readbuf;
}
/// <summary>
/// Returns the stream with the buffer ready to be written with data
/// </summary>
/// <returns>Stream</returns>
public Stream GetWriteBuffer()
{
lock (sync)
{
Stream tempbuf = swapbuf;
swapbuf = writebuf;
writebuf = tempbuf;
if (!firsttime)
{
Monitor.Pulse(sync);
}
else
{
firsttime = false;
}
}
//Thread.Sleep(1);
return writebuf;
}
}
}
The firsttime check is used because the first time a writebuffer is asked, it can not pulse the consumer because the buffer still has to be written with data. When a writebuffer is asked a second time, we can be sure the previous buffer contains data.
I have two threads, one producer and one consumer.
This is my output:
prod: uv_hjd`alv cons: N/<]g[)8fV
prod: N/<]g[)8fV cons: 5Ud*tJ-Qkv
prod: 5Ud*tJ-Qkv cons: 4Lx&Z7qqjA
prod: 4Lx&Z7qqjA cons: kjUuVyCa.B
prod: kjUuVyCa.B
Now it's ok the consumer lags one behind, it is supposed to do that.
As you can see i lose my first string of data wich is my main problem.
The other problems are this:
- if i remove the firsttime check, it works. But it shouldn't in my opinion...
- if i add a Thread.Sleep(1); in the GetWriteBuffer() it also works. Something i don't understand.
Thanks in advance for any enlightenment.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我已经解决了我的问题。我用 byte[] 替换了所有 Stream 实例。现在效果很好。
不知道为什么 Stream 不起作用,不想花更多时间来解决这个问题。
这是针对遇到相同问题的任何人的新代码。
I've fixed my problem. I replaced all the Stream instances with byte[]. Now it works fine.
Don't know why Stream would not work, do not want to spent more time figuring this out.
Here's the new code for anyone running into the same problem.