可以使用反应式框架重构这段代码吗?

发布于 2024-09-14 09:30:42 字数 2675 浏览 5 评论 0原文

将以下代码复制粘贴到新的 C# 控制台应用程序中。

class Program
{
    static void Main(string[] args)
    {
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);

        Publisher(enumerator);
        listenerWaitHandle.WaitOne();
    }

    private static AutoResetEvent Listener(IEnumerator<long> items)
    {
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (items.MoveNext())
            {
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            }
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;
    }

    private static void Publisher(QueuedEnumerator<long> enumerator)
    {
        for (int i = 0; i < 10; i++)
        {
            enumerator.Set(i);
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);
        }
        enumerator.Finish();
    }

    class QueuedEnumerator<T> : IEnumerator<T>
    {
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
        {
            _finished = true;
            _setted.Set();
        }

        public void Set(T item)
        {
            if (_internal.Count > 3)
            {
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);
                Set(item);
            }
            else
            {
                _internal.Enqueue(item);
                _setted.Set();
            }
        }

        public T Current
        {
            get { return _current; }
        }

        public void Dispose()
        {
        }


        object System.Collections.IEnumerator.Current
        {
            get { return _current; }
        }

        public bool MoveNext()
        {
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
            {
                _current = (T)_internal.Dequeue();
                return true;
            }
            else
            {
                _setted.WaitOne();
                return MoveNext();
            }
        }

        public void Reset()
        {
        }
    }
}

2 个线程 (A,B)

一个线程一次可以提供一个实例并调用 Set 方法 B 线程想要接收一系列实例(由线程 A 提供),

因此从字面上将 Add(item)、Add(item), .. 转换为不同线程之间的 IEnumerable

当然也欢迎其他解决方案!

copy paste the following code in new C# console app.

class Program
{
    static void Main(string[] args)
    {
        var enumerator = new QueuedEnumerator<long>();
        var listenerWaitHandle = Listener(enumerator);

        Publisher(enumerator);
        listenerWaitHandle.WaitOne();
    }

    private static AutoResetEvent Listener(IEnumerator<long> items)
    {
        var @event = new AutoResetEvent(false);
        ThreadPool.QueueUserWorkItem((o) =>
        {
            while (items.MoveNext())
            {
                Console.WriteLine("Received : " + items.Current);
                Thread.Sleep(2 * 1000);
            }
            (o as AutoResetEvent).Set();
        }, @event);
        return @event;
    }

    private static void Publisher(QueuedEnumerator<long> enumerator)
    {
        for (int i = 0; i < 10; i++)
        {
            enumerator.Set(i);
            Console.WriteLine("Sended : " + i);
            Thread.Sleep(1 * 1000);
        }
        enumerator.Finish();
    }

    class QueuedEnumerator<T> : IEnumerator<T>
    {
        private Queue _internal = Queue.Synchronized(new Queue());
        private T _current;
        private bool _finished;
        private AutoResetEvent _setted = new AutoResetEvent(false);

        public void Finish()
        {
            _finished = true;
            _setted.Set();
        }

        public void Set(T item)
        {
            if (_internal.Count > 3)
            {
                Console.WriteLine("I'm full, give the listener some slack !");
                Thread.Sleep(3 * 1000);
                Set(item);
            }
            else
            {
                _internal.Enqueue(item);
                _setted.Set();
            }
        }

        public T Current
        {
            get { return _current; }
        }

        public void Dispose()
        {
        }


        object System.Collections.IEnumerator.Current
        {
            get { return _current; }
        }

        public bool MoveNext()
        {
            if (_finished && _internal.Count == 0)
                return false;
            else if (_internal.Count > 0)
            {
                _current = (T)_internal.Dequeue();
                return true;
            }
            else
            {
                _setted.WaitOne();
                return MoveNext();
            }
        }

        public void Reset()
        {
        }
    }
}

2 threads (A,B)

A thread can provide one instance at a time and calls the Set method
B thread wants to receive a sequence of instances (provided by thread A)

So literally transforming an Add(item), Add(item), .. to a IEnumerable between different threads

Other solutions also welcome of course!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

人疚 2024-09-21 09:30:42

当然 - 这段代码可能不是最好的方法,但这是我的初步尝试:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
{
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);
}

void Add(Item to_add)
{
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults
        ToAddOnAnotherThread.OnNext(to_add);
    }
}

IEnumerable<Item> FetchResults()
{
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        toAddObservable.OnCompleted();
        Init();     // Recreate everything
    }

    return ret;
}

Sure - this code might not be the best way to do it, but here was my initial stab at it:

Subject<Item> toAddObservable;
ListObservable<Item> buffer;

void Init()
{
    // Subjects are an IObservable we can trigger by-hand, they're the 
    // mutable variables of Rx
    toAddObservable = new Subject(Scheduler.TaskPool);

    // ListObservable will hold all our items until someone asks for them
    // It will yield exactly *one* item, but only when toAddObservable
    // is completed.
    buffer = new ListObservable<Item>(toAddObservable);
}

void Add(Item to_add)
{
    lock (this) {
        // Subjects themselves are thread-safe, but we still need the lock
        // to protect against the reset in FetchResults
        ToAddOnAnotherThread.OnNext(to_add);
    }
}

IEnumerable<Item> FetchResults()
{
    IEnumerable<Item> ret = null;
    buffer.Subscribe(x => ret = x);

    lock (this) {
        toAddObservable.OnCompleted();
        Init();     // Recreate everything
    }

    return ret;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文