实现异步“从流中读取所有当前可用的数据”手术

发布于 2024-10-08 20:25:28 字数 4233 浏览 1 评论 0原文

我最近提供了这个问题的答案:C# - 实时控制台输出重定向

正如经常发生的那样,解释东西(这里的“东西”是我解决类似问题的方式)可以让你更好地理解和/或,就像这里的情况一样,“哎呀”时刻。我意识到我的解决方案在实施时有一个错误。这个错误没有什么实际意义,但它对我作为一个开发人员来说却非常重要:我不能高枕无忧地知道我的代码有可能崩溃。

消除错误是这个问题的目的。对于这么长的介绍,我深表歉意,所以让我们开始吧。

我想构建一个类,允许我从控制台的标准输出 Stream 接收输入。控制台输出流的类型为FileStream;如果需要,实现可以转换为该内容。还有一个已存在的关联 StreamReader 可供利用。

我只需要在此类中实现一件事即可实现我想要的功能:异步“读取此刻可用的所有数据”操作。读取到流的末尾是不可行的,因为除非进程关闭控制台输出句柄,否则流不会结束,并且它不会这样做,因为它是交互式的并且在继续之前等待输入。

我将使用假设的异步操作来实现基于事件的通知,这对我的调用者来说会更方便。

该类的公共接口是这样的:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

StartSendingEventsStopSendingEvents 执行它们所宣传的操作;为了讨论的目的,我们可以假设事件总是被发送而不失一般性。

该类在内部使用这两个字段:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

该类的功能在以下方法中实现。为了让事情顺利进行:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

要在不阻塞且不需要回车符的情况下从 Stream 中读取数据,需要调用 BeginRead

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

具有挑战性的部分:

BeginRead 需要使用缓冲区。这意味着从流中读取时,可读取的字节(“传入块”)可能大于缓冲区。 请记住,这里的目标是读取所有块并为每个块调用一次事件订阅者

为此,如果 EndRead 后缓冲区已满,我们不会立即将其内容发送给订阅者,而是将其附加到 StringBuilder 中。仅当没有更多内容可从流中读取时,StringBuilder 的内容才会被发送回。

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

在任何不足以填充缓冲区的读取之后(在这种情况下我们知道在上次读取操作期间没有更多数据可读取),所有累积的数据都将发送到订阅者:(

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

我知道只要有没有订阅者,数据会永远累积,这是一个经过深思熟虑的决定)。

好处

这个方案几乎完美地工作:

  • 异步功能,无需产生任何线程,
  • 对调用代码非常方便(只需订阅一个事件)
  • 每次数据不会有多个事件可供读取
  • 几乎不知道缓冲区大小

坏处

最后一个几乎是一个非常大的缓冲区。考虑当传入块的长度恰好等于缓冲区大小时会发生什么。该块将被读取并缓冲,但事件不会被触发。接下来将是一个 BeginRead ,它希望找到属于当前块的更多数据,以便将其全部发送回,但是......流中将不再有数据。

事实上,只要将数据以长度恰好等于缓冲区大小的块放入流中,数据就会被缓冲,并且事件永远不会被触发。

这种情况不太可能发生在实践中会发生这种情况,特别是因为我们可以选择任意数字作为缓冲区大小,但问题就在那里。

解决方案?

不幸的是,在检查了FileStreamStreamReader上的可用方法后,我找不到任何可以让我窥视流的东西允许在其上使用异步方法。

一种“解决方案”是在检测到“缓冲区已满”情况后让线程等待 ManualResetEvent。如果在很短的时间内没有发出事件信号(通过异步回调),那么流中的更多数据将不会到来,并且到目前为止积累的数据应该发送给订阅者。然而,这引入了对另一个线程的需要,需要线程同步,并且很不优雅。

BeginRead 指定超时也足够了(时不时地回调我的代码,这样我就可以检查是否有数据要发回;大多数时候不会有任何事情可做,所以我预计对性能的影响可以忽略不计)。但 FileStream 似乎不支持超时。

由于我认为带超时的异步调用是裸 Win32 中的一个选项,因此另一种方法可能是使用 PInvoke 来解决问题。但这也是不可取的,因为它会带来复杂性并且会给编码带来痛苦。

有没有一种优雅的方法来解决这个问题?

感谢您有足够的耐心阅读所有这些。

更新:

我在最初的文章中确实没有很好地传达场景。此后我对这篇文章进行了相当多的修改,但要格外确定:

问题是关于如何实现异步“读取此刻可用的所有数据”操作。

我向那些采取了行动的人致歉在我没有明确表达自己的意图的情况下阅读和回答的时间。

I recently provided an answer to this question: C# - Realtime console output redirection.

As often happens, explaining stuff (here "stuff" was how I tackled a similar problem) leads you to greater understanding and/or, as is the case here, "oops" moments. I realized that my solution, as implemented, has a bug. The bug has little practical importance, but it has an extremely large importance to me as a developer: I can't rest easy knowing that my code has the potential to blow up.

Squashing the bug is the purpose of this question. I apologize for the long intro, so let's get dirty.

I wanted to build a class that allows me to receive input from a console's standard output Stream. Console output streams are of type FileStream; the implementation can cast to that, if needed. There is also an associated StreamReader already present to leverage.

There is only one thing I need to implement in this class to achieve my desired functionality: an async "read all the data available this moment" operation. Reading to the end of the stream is not viable because the stream will not end unless the process closes the console output handle, and it will not do that because it is interactive and expecting input before continuing.

I will be using that hypothetical async operation to implement event-based notification, which will be more convenient for my callers.

The public interface of the class is this:

public class ConsoleAutomator {
    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;

    public void StartSendingEvents();
    public void StopSendingEvents();
}

StartSendingEvents and StopSendingEvents do what they advertise; for the purposes of this discussion, we can assume that events are always being sent without loss of generality.

The class uses these two fields internally:

    protected readonly StringBuilder inputAccumulator = new StringBuilder();

    protected readonly byte[] buffer = new byte[256];

The functionality of the class is implemented in the methods below. To get the ball rolling:

    public void StartSendingEvents();
    {
        this.stopAutomation = false;
        this.BeginReadAsync();
    }

To read data out of the Stream without blocking, and also without requiring a carriage return char, BeginRead is called:

    protected void BeginReadAsync()
    {
        if (!this.stopAutomation) {
            this.StandardOutput.BaseStream.BeginRead(
                this.buffer, 0, this.buffer.Length, this.ReadHappened, null);
        }
    }

The challenging part:

BeginRead requires using a buffer. This means that when reading from the stream, it is possible that the bytes available to read ("incoming chunk") are larger than the buffer.
Remember that the goal here is to read all of the chunk and call event subscribers exactly once for each chunk.

To this end, if the buffer is full after EndRead, we don't send its contents to subscribers immediately but instead append them to a StringBuilder. The contents of the StringBuilder are only sent back whenever there is no more to read from the stream.

    private void ReadHappened(IAsyncResult asyncResult)
    {
        var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
        if (bytesRead == 0) {
            this.OnAutomationStopped();
            return;
        }

        var input = this.StandardOutput.CurrentEncoding.GetString(
            this.buffer, 0, bytesRead);
        this.inputAccumulator.Append(input);

        if (bytesRead < this.buffer.Length) {
            this.OnInputRead(); // only send back if we 're sure we got it all
        }

        this.BeginReadAsync(); // continue "looping" with BeginRead
    }

After any read which is not enough to fill the buffer (in which case we know that there was no more data to be read during the last read operation), all accumulated data is sent to the subscribers:

    private void OnInputRead()
    {
        var handler = this.StandardOutputRead;
        if (handler == null) {
            return;
        }

        handler(this, 
                new ConsoleOutputReadEventArgs(this.inputAccumulator.ToString()));
        this.inputAccumulator.Clear();
    }

(I know that as long as there are no subscribers the data gets accumulated forever. This is a deliberate decision).

The good

This scheme works almost perfectly:

  • Async functionality without spawning any threads
  • Very convenient to the calling code (just subscribe to an event)
  • Never more than one event for each time data is available to be read
  • Is almost agnostic to the buffer size

The bad

That last almost is a very big one. Consider what happens when there is an incoming chunk with length exactly equal to the size of the buffer. The chunk will be read and buffered, but the event will not be triggered. This will be followed up by a BeginRead that expects to find more data belonging to the current chunk in order to send it back all in one piece, but... there will be no more data in the stream.

In fact, as long as data is put into the stream in chunks with length exactly equal to the buffer size, the data will be buffered and the event will never be triggered.

This scenario may be highly unlikely to occur in practice, especially since we can pick any number for the buffer size, but the problem is there.

Solution?

Unfortunately, after checking the available methods on FileStream and StreamReader, I can't find anything which lets me peek into the stream while also allowing async methods to be used on it.

One "solution" would be to have a thread wait on a ManualResetEvent after the "buffer filled" condition is detected. If the event is not signaled (by the async callback) in a small amount of time, then more data from the stream will not be forthcoming and the data accumulated so far should be sent to subscribers. However, this introduces the need for another thread, requires thread synchronization, and is plain inelegant.

Specifying a timeout for BeginRead would also suffice (call back into my code every now and then so I can check if there's data to be sent back; most of the time there will not be anything to do, so I expect the performance hit to be negligible). But it looks like timeouts are not supported in FileStream.

Since I imagine that async calls with timeouts are an option in bare Win32, another approach might be to PInvoke the hell out of the problem. But this is also undesirable as it will introduce complexity and simply be a pain to code.

Is there an elegant way to get around the problem?

Thanks for being patient enough to read all of this.

Update:

I definitely did not communicate the scenario well in my initial writeup. I have since revised the writeup quite a bit, but to be extra sure:

The question is about how to implement an async "read all the data available this moment" operation.

My apologies to the people who took the time to read and answer without me making my intent clear enough.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

谎言 2024-10-15 20:25:29

理论上,我同意杰森的观点;在数据块可以被缓冲区整除的情况下,您的实现存在比逻辑漏洞更大的问题。我看到的最大问题是,您的读者必须对文件类型有足够的了解,才能知道它如何将数据分成您的订阅者知道如何处理的“块”。

流对于它们正在接收或发送的内容没有固有的了解;仅它们传输数据的机制。 NetworkStream 可能正在发送 HTML 或 ZIP 文件; FileStream 可能正在读取文本文件或 MP3。拥有这些知识的是阅读器(XmlReader、TextReader、Image.FromStream() 等)。因此,您的异步读取器必须至少了解一些有关数据的信息,但不对该知识进行硬编码会很有用。

为了处理“流”数据,增量发送必须单独有用;你必须对你所得到的东西有足够的了解,你所得到的东西是一个可以单独处理的“块”。我的建议是以封装的方式向您的异步阅读器提供该信息,要么让您的订阅者告诉您,要么通过提供一些与侦听器分开的特定于格式的“块化器”(因为该阅读器正在侦听控制台输出,并且所有听众应该以同样的方式对待它,第二个计划可能会更好)。

逻辑实现:

public class MyStreamManager {
    public delegate bool ValidChunkTester(StringBuilder builder);

    private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
    public event ValidChunkTester IsValidChunk
    { add{validators.Add(value);} remove {validators.Remove(value);}}

    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;


    public void StartSendingEvents();
    public void StopSendingEvents();
}

...

private void ReadHappened(IAsyncResult asyncResult)
{
    var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
    if (bytesRead == 0) {
        this.OnAutomationStopped();
        return;
    }

    var input = this.StandardOutput.CurrentEncoding.GetString(
        this.buffer, 0, bytesRead);
    this.inputAccumulator.Append(input);

    if (validators.Any() && StandardOutputRead !-= null 
            && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
        this.OnInputRead(); // send when all listeners can work with the buffer contents
    }

    this.BeginReadAsync(); // continue "looping" with BeginRead
}

...

该模型要求订阅者不能修改 StringBuilder;如果您愿意,您可以提供一些不可变的东西供他们检查。一个示例侦听器可能是:

public bool IsACompleteLine(StringBuilder builder)
{
    return builder.Contains(Environment.NewLine);
}

或:

public bool Contains256Bytes(StringBuilder builder)
{
    return builder.Length >= 256;
}

... 你明白了。确定要释放给侦听器的当前缓冲区的价值的事件在概念上与侦听器本身是分开的,但不必具体如此,因此它将支持单个特定于输出的测试或多个基于侦听器的测试。

In theory, I agree with Jason; your implementation has bigger problems than having a logic hole in the case of a chunk of data being evenly divisible by your buffer. The biggest problem I see is that your reader must have enough knowledge about the file type to know how it can separate data into "chunks" that your subscribers know how to deal with.

Streams have no inherent knowledge about what they're receiving or sending; only the mechanism by which they are transporting the data. A NetworkStream may be sending HTML or a ZIP file; a FileStream may be reading a text file or an MP3. It's the reader (XmlReader, TextReader, Image.FromStream(), etc) that has this knowledge. Therefore, your async reader has to know at least something about the data, but it would be useful not to have that knowledge hard-coded.

In order to work with "streaming" data, incremental sends must be individually useful; you must know enough about what you're getting to know that what you've gotten is a "chunk" that is individually processable. My suggestion is to provide that information to your async reader in an encapsulated fashion, either by having your subscribers tell you, or by providing some format-specific "chunkifier" seperate from the listeners (as this reader is listening to console output, and all listeners should treat it the same way, this second plan may be better).

A logical implementation:

public class MyStreamManager {
    public delegate bool ValidChunkTester(StringBuilder builder);

    private readonly List<ValidChunkTester> validators = new List<ValidChunkTester>();
    public event ValidChunkTester IsValidChunk
    { add{validators.Add(value);} remove {validators.Remove(value);}}

    public event EventHandler<ConsoleOutputReadEventArgs> StandardOutputRead;


    public void StartSendingEvents();
    public void StopSendingEvents();
}

...

private void ReadHappened(IAsyncResult asyncResult)
{
    var bytesRead = this.StandardOutput.BaseStream.EndRead(asyncResult);
    if (bytesRead == 0) {
        this.OnAutomationStopped();
        return;
    }

    var input = this.StandardOutput.CurrentEncoding.GetString(
        this.buffer, 0, bytesRead);
    this.inputAccumulator.Append(input);

    if (validators.Any() && StandardOutputRead !-= null 
            && validators.Aggregate(true, (valid, validator)=>valid && validator(inputAccumulator))) {
        this.OnInputRead(); // send when all listeners can work with the buffer contents
    }

    this.BeginReadAsync(); // continue "looping" with BeginRead
}

...

This model requires that subscribers not modify the StringBuilder; you can provide something immutable for them to examine if you choose. An example listener might be:

public bool IsACompleteLine(StringBuilder builder)
{
    return builder.Contains(Environment.NewLine);
}

or:

public bool Contains256Bytes(StringBuilder builder)
{
    return builder.Length >= 256;
}

... you get the idea. The event determining the worthiness of the current buffer to be released to listeners is conceptually separate from the listeners themselves, but doesn't have to be concretely so, so it will support either a single output-specific test or multiple listener-based tests.

萌酱 2024-10-15 20:25:29

如果您按照所描述的方式从 FileStream 中读取数据,那么将读取底层文件的全部内容。因此,您将只有一个“块”数据,您将一小段地将其读入 StringBuilder(效率有些低)。您的实现中没有任何方法可以将数据分成更小的“块”,因为读取将继续填充缓冲区,直到文件耗尽。在这个抽象级别,只有客户端知道这些块的大小应该是多少,因此您必须将数据交给他们以解码为块。这违背了缓冲区的最初目的。

如果您有某种其他形式的流可以突发传输数据(例如控制台输出或通信数据包),那么您将获得突发数据,但您仍然无法保证读取结束时少于缓冲区满data 意味着您已经到达数据包的末尾,只是传输中有一个暂停。通常在这些情况下,您需要缓冲数据并对其进行处理(即了解数据格式)以确定何时接收到完整的块/数据包。在这种情况下,您的缓冲区中将始终有一个“未完成的块”等待,直到收到更多数据以终止该块或启动一个新块,并将其“推出”缓冲区。这可能是通信中的一个问题,因为下一个数据包可能很长时间不会到达。

因此,最终,您需要让读者了解如何将数据划分为块,这意味着您需要客户端进行解码,这就是为什么基本流类尚未以您的方式传递数据的原因正在努力实施。

那么,通过添加这个中级课程,您将获得什么?最好的情况是,它会给您的 I/O 增加一层额外的复杂性和开销(让我们面对现实,您试图从客户端代码中抽象出的只是几行代码)。最坏的情况是,它将无法按照您的要求将数据分解为块,因此根本没有用处。

请注意“这种情况在实践中可能极不可能发生”:当流式传输大量数据时,您可以放心,即使是“极不可能”的事件也会以相当大的规律性发生 - 当然通常足以让您可以不要假设它们永远不会发生。

[编辑 - 添加]

如果您不寻求概括您的解决方案,那么您可以向轻松处理问题的类添加逻辑。

两种可能的解决方案是:

  • 如果您知道将输出给您的控制台行的最大限制,您可以简单地使用足够大的缓冲区,以保证您的边缘情况永远不会发生。 (例如,CreateProcess 命令限制为 32k,cmd.exe 将命令限制为 8k。您可能会发现类似的限制适用于您接收的数据“块”)

  • 如果您的块始终是行(换行符终止的文本块),然后只需检查缓冲区中的最后一个字符是否看起来像终止符(0x0a 或 0x0d)。如果不是,则需要读取更多数据。

If you read from a FileStream in the manner you have described, then the entire contents of the underlying file will be read. Thus, you will only have one "chunk" of data, which you will read into a StringBuilder (somewhat inefficiently) in tiny bites. Nothing in your implementation gives any way of breaking the data into smaller "chunks", because the read will continue filling your buffer until the file is exhausted. At this level of abstraction, only the client knows what size these chunks should be, so you will have to hand the data over to them to be decoded into chunks. Which defeats the original purpose of your buffer.

If you have some other form of stream that delivers data in bursts (eg. console output or comms packets), then you will get bursts of data, but you still can't guarantee that a read ending with less than a buffer-ful of data means that you have reached the end of a packet, simply that there is a pause in the transmission. Usually in these cases you need to buffer the data and process it (i.e. have knowledger of the data format) to determine when a complete chunk/packet has been received. In this scenario you will always have an "unfinished chunk" waiting in your buffer until some more data is received to terminate the chunk or start a new chunk, and "push it out" of your buffer. This can be a problem in comms where the next packet may not arrive for a long time.

So ultimately, you will need to prime your reader with knowledge of how the data should be divided into chunks, which means you need the client to do the decoding, which is why the base stream classes don't already deliver data in the manner you are trying to implement.

So, by adding this intermediate class, what will you gain? At best it will add an extra layer of complexity and overhead to your I/O (let's face it, what you're trying to abstract out of your client code is only a few lines of code). At worst, it will be unable to break the data into chunks as you require, so will be of no use at all.

Beware "This scenario may be highly unlikely to occur in practice": When streaming large amounts of data you can be assured that even "highly unlikely" events will occur with considerable regularity - certainly often enouhg that you can't assume they will never happen.

[edit - added]

If you are not seeking to generalise your solution, then you can add logic to the class that handles the problem easily.

Two possible solutions are:

  • If you know the maximum limit of the console lines that will be output to you, you can simply use a large enough buffer that you can guarantee your edge case will never occur. (e.g. CreateProcess commands are limited to 32k, cmd.exe limits commands to 8k. You may find similar limits pply to the data "chunks" you are receiving)

  • If your chunks are always lines (newline terminated blocks of text), then simply check if the last character in your buffer looks like a terminator (0x0a or 0x0d). If not, there is more data to read.

她说她爱他 2024-10-15 20:25:29

我倾向于删除“双缓冲”(填充 StringBuilder 的部分,然后在数据满时传递数据),并在读取字节时返回从 Stream 缓冲区接收的数据。因此,在 ReadHappened 中,您将拥有:

if (bytesRead > 0) {
    this.OnInputRead(); // only send back if we 're sure we got it all
}

正如其他人所说,订阅者需要了解有关消息/数据块以及如何将多个部分组合成一个整体的信息。因此,您也可以在收到每个部件时将其退回。如果订阅者是一个“哑订阅者”,它只是充当管道,那么这也可以工作。

I would be inclined to remove the "double buffering" (the part where you fill the StringBuilder then pass the data when it is full) and return the data received from the Stream's buffer whenever bytes are read. So in ReadHappened, you would have:

if (bytesRead > 0) {
    this.OnInputRead(); // only send back if we 're sure we got it all
}

As others have stated the subscriber will need to know something about the message/chunk of data and how to combine multiple parts into one whole. Therefore you may as well return each part as you receive it. If the subscriber is a "dumb subscriber" which simply acts as a conduit this would work too.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文