C# 线程安全的StreamWriter 怎么做呢? 2

发布于 2024-09-16 20:16:47 字数 1695 浏览 9 评论 0原文

所以这是我上一个问题的延续 - 所以问题是 “构建线程安全的程序的最佳方法是什么,因为它需要将双精度值写入文件。如果通过流写入器保存值的函数被多个线程调用?最好的方法是什么? ?”

我修改了MSDN上找到的一些代码,下面怎么样?这个正确地将所有内容写入文件。

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}

So this is a continuation from my last question - So the question was
"What is the best way to build a program that is thread safe in terms that it needs to write double values to a file. If the function that saves the values via streamwriter is being called by multiple threads? Whats the best way of doing it?"

And I modified some code found at MSDN, how about the following? This one correctly writes everything to the file.

namespace SafeThread
{
    class Program
    {
        static void Main()
        {
            Threading threader = new Threading();

            AutoResetEvent autoEvent = new AutoResetEvent(false);

            Thread regularThread =
                new Thread(new ThreadStart(threader.ThreadMethod));
            regularThread.Start();

            ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod),
                autoEvent);

            // Wait for foreground thread to end.
            regularThread.Join();

            // Wait for background thread to end.
            autoEvent.WaitOne();
        }
    }


    class Threading
    {
        List<double> Values = new List<double>();
        static readonly Object locker = new Object();
        StreamWriter writer = new StreamWriter("file");
        static int bulkCount = 0;
        static int bulkSize = 100000;

        public void ThreadMethod()
        {
            lock (locker)
            {
                while (bulkCount < bulkSize)
                    Values.Add(bulkCount++);
            }
            bulkCount = 0;
        }

        public void WorkMethod(object stateInfo)
        {
            lock (locker)
            {
                foreach (double V in Values)
                {
                    writer.WriteLine(V);
                    writer.Flush();
                }
            }
            // Signal that this thread is finished.
            ((AutoResetEvent)stateInfo).Set();
        }
    }
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

暖阳 2024-09-23 20:16:47

ThreadQueueUserWorkItem 是线程可用的最低 API。除非我最终别无选择,否则我不会使用它们。尝试使用 Task 类以获得更高级别的抽象。有关详细信息,请参阅我最近关于该主题的博客文章

您还可以使用 BlockingCollection 作为适当的生产者/消费者队列,而不是尝试使用最低可用的同步 API 手动构建一个队列 >。

正确地重新发明这些轮子是非常困难的。我强烈建议使用专为此类需求设计的类(具体来说,TaskBlockingCollection)。它们内置于 .NET 4.0 框架和 作为 .NET 3.5 的附加组件提供

Thread and QueueUserWorkItem are the lowest available APIs for threading. I wouldn't use them unless I absolutely, finally, had no other choice. Try the Task class for a much higher-level abstraction. For details, see my recent blog post on the subject.

You can also use BlockingCollection<double> as a proper producer/consumer queue instead of trying to build one by hand with the lowest available APIs for synchronization.

Reinventing these wheels correctly is surprisingly difficult. I highly recommend using the classes designed for this type of need (Task and BlockingCollection, to be specific). They are built-in to the .NET 4.0 framework and are available as an add-on for .NET 3.5.

话少心凉 2024-09-23 20:16:47
  • 该代码将 writer 作为实例变量,但使用静态储物柜。如果您有多个实例写入不同的文件,那么它们没有理由需要
  • 在相关注释上共享相同的锁,因为您已经拥有 writer(作为私有实例变量),您可以使用它来锁定而不是使用在这种情况下,单独的储物柜对象 - 这使事情变得更简单。

“正确答案”实际上取决于您在锁定/阻止行为方面寻找的内容。例如,最简单的事情是跳过中间数据结构,只使用 WriteValues 方法,以便每个线程“报告”其结果并将其写入文件。类似于:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

当然,这意味着工作线程在“报告结果”阶段进行序列化 - 根据性能特征,这可能就很好(例如,生成 5 分钟,写入 500 毫秒)。

另一方面,您可以让工作线程写入数据结构。如果您使用 .NET 4,我建议仅使用 ConcurrentQueue< /a> 而不是这样做锁定自己。

此外,您可能希望以比工作线程报告的批量更大的批量执行文件 I/O,因此您可能选择仅以某种频率在后台线程中进行写入。频谱的这一端看起来像下面这样(您可以在实际代码中删除 Console.WriteLine 调用,这些调用就在那里,这样您就可以看到它正在运行)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
  • the code has the writer as an instance var but using a static locker. If you had multiple instances writing to different files, there's no reason they would need to share the same lock
  • on a related note, since you already have the writer (as a private instance var), you can use that for locking instead of using a separate locker object in this case - that makes things a little simpler.

The 'right answer' really depends on what you're looking for in terms of locking/blocking behavior. For instance, the simplest thing would be to skip the intermediate data structure just have a WriteValues method such that each thread 'reporting' its results goes ahead and writes them to the file. Something like:

StreamWriter writer = new StreamWriter("file");
public void WriteValues(IEnumerable<double> values)
{
    lock (writer)
    {
        foreach (var d in values)
        {
            writer.WriteLine(d);
        }
        writer.Flush();
    }
}

Of course, this means worker threads serialize during their 'report results' phases - depending on the performance characteristics, that may be just fine though (5 minutes to generate, 500ms to write, for example).

On the other end of the spectrum, you'd have the worker threads write to a data structure. If you're in .NET 4, I'd recommend just using a ConcurrentQueue rather than doing that locking yourself.

Also, you may want to do the file i/o in bigger batches than those being reported by the worker threads, so you might choose to just do writing in a background thread on some frequency. That end of the spectrum looks something like the below (you'd remove the Console.WriteLine calls in real code, those are just there so you can see it working in action)

public class ThreadSafeFileBuffer<T> : IDisposable
{
    private readonly StreamWriter m_writer;
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>();
    private readonly Timer m_timer;

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5)
    {
        m_writer = new StreamWriter(filePath);
        var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds);
        m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod);
    }

    public void AddResult(T result)
    {
        m_buffer.Enqueue(result);
        Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count);
    }

    public void Dispose()
    {
        Console.WriteLine("Turning off timer");
        m_timer.Dispose();
        Console.WriteLine("Flushing final buffer output");
        FlushBuffer(); // flush anything left over in the buffer
        Console.WriteLine("Closing file");
        m_writer.Dispose();
    }

    /// <summary>
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock
    /// </summary>
    /// <param name="unused"></param>
    private void FlushBuffer(object unused = null)
    {
        T current;
        while (m_buffer.TryDequeue(out current))
        {
            Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count);
            m_writer.WriteLine(current);
        }
        m_writer.Flush();
    }
}

class Program
{
    static void Main(string[] args)
    {
        var tempFile = Path.GetTempFileName();
        using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile))
        {
            Parallel.For(0, 100, i =>
            {
                // simulate some 'real work' by waiting for awhile
                var sleepTime = new Random().Next(10000);
                Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime);
                Thread.Sleep(sleepTime);
                resultsBuffer.AddResult(Math.PI*i);
            });
        }
        foreach (var resultLine in File.ReadAllLines(tempFile))
        {
            Console.WriteLine("Line from result: {0}", resultLine);
        }
    }
}
山人契 2024-09-23 20:16:47

所以你是说你想要一堆线程使用 StreamWriter 将数据写入单个文件?简单的。只需锁定 StreamWriter 对象即可。

这里的代码将创建 5 个线程。每个线程将执行 5 个“操作”,并且在每个操作结束时,它将向名为“file”的文件写入 5 行。

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

结果应该是一个包含 125 行的文件“文件”,其中所有“操作”同时执行,并且每个操作的结果同步写入该文件。

So you're saying you want a bunch of threads to write data to a single file using a StreamWriter? Easy. Just lock the StreamWriter object.

The code here will create 5 threads. Each thread will perform 5 "actions," and at the end of each action it will write 5 lines to a file named "file."

using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace ConsoleApplication1 {
    class Program {
        static void Main() {
            StreamWriter Writer = new StreamWriter("file");

            Action<int> ThreadProcedure = (i) => {
                // A thread may perform many actions and write out the result after each action
                // The outer loop here represents the multiple actions this thread will take
                for (int x = 0; x < 5; x++) {
                    // Here is where the thread would generate the data for this action
                    // Well simulate work time using a call to Sleep
                    Thread.Sleep(1000);
                    // After generating the data the thread needs to lock the Writer before using it.
                    lock (Writer) {
                        // Here we'll write a few lines to the Writer
                        for (int y = 0; y < 5; y++) {
                            Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y);
                        }
                    }
                }
            };

            //Now that we have a delegate for the thread code lets make a few instances

            List<IAsyncResult> AsyncResultList = new List<IAsyncResult>();
            for (int w = 0; w < 5; w++) {
                AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null));
            }

            // Wait for all threads to complete
            foreach (IAsyncResult r in AsyncResultList) {
                r.AsyncWaitHandle.WaitOne();
            }

            // Flush/Close the writer so all data goes to disk
            Writer.Flush();
            Writer.Close();
        }
    }
}

The result should be a file "file" with 125 lines in it with all "actions" performed concurrently and the result of each action written synchronously to the file.

难得心□动 2024-09-23 20:16:47

您那里的代码被巧妙地破坏了 - 特别是,如果排队的工作项首先运行,那么它将在终止之前立即刷新(空)值列表,之后您的工作人员将填充列表(这将最终被忽略)。自动重置事件也不执行任何操作,因为没有任何内容查询或等待其状态。

另外,由于每个线程使用不同锁,这些锁没有任何意义!您需要确保每次访问流写入器时都持有一个共享锁。您不需要在刷新代码和生成代码之间加锁;您只需要确保刷新在生成完成后运行即可。

不过,您可能走在正确的轨道上 - 尽管我会使用固定大小的数组而不是列表,并在数组满时刷新数组中的所有条目。如果线程寿命较长,这可以避免内存不足的可能性。

The code you have there is subtly broken - in particular, if the queued work item runs first, then it will flush the (empty) list of values immediately, before terminating, after which point your worker goes and fills up the List (which will end up being ignored). The auto-reset event also does nothing, since nothing ever queries or waits on its state.

Also, since each thread uses a different lock, the locks have no meaning! You need to make sure you hold a single, shared lock whenever accessing the streamwriter. You don't need a lock between the flushing code and the generation code; you just need to make sure the flush runs after the generation finishes.

You're probably on the right track, though - although I'd use a fixed-size array instead of a list, and flush all entries from the array when it gets full. This avoids the possibility of running out of memory if the thread is long-lived.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文