带定时器的观察者模式

发布于 2024-11-01 04:23:19 字数 4401 浏览 1 评论 0原文

我在我的应用程序中使用了观察者模式。

我有一个主题,其中有一个名为 'tmr'System.Timers.Timer 对象。此计时器的tick 事件每60 秒后触发。在这次勾选事件中,我将通知所有与我的主题相关的观察者。我使用了 for 循环来迭代我的观察者列表和列表。然后触发观察者更新方法。

假设我有 10 名观察员关注我的主题。

每个观察者需要 10 秒来完成其处理。

现在,在 for 循环中完成通知会导致最后一个观察者的 Update 方法在 90 秒后被调用。即下一个观察者更新方法仅在前一个观察者完成其处理后才被调用。

但这不是我在申请中想要的。我需要在计时器滴答发生时立即触发所有观察者 Update 方法。这样就没有观察者需要等待。我希望这可以通过线程来完成。

所以,我修改了代码,

// Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

但是我心里有两个疑问,

  1. 如果有10个观察者 我的计时器间隔是 60 秒 那么语句 new Thread() 将触发 600 次。

    是否高效并建议在每个计时器滴答处创建新线程?

  2. 如果我的观察者花费太多时间来完成他们的更新逻辑(即超过 60 秒)怎么办?意味着计时器滴答发生在观察者更新之前。我如何控制这个?

我可以发布示例代码..如果需要的话...

我使用的代码..

using System;
using System.Collections.Generic;
using System.Timers;
using System.Text;
using Threading = System.Threading;
using System.ComponentModel;

namespace singletimers
{
  class Program
  {


    static void Main(string[] args)
    {
      DataPullerSubject.Instance.Attach(Observer1.Instance);
      DataPullerSubject.Instance.Attach(Observer2.Instance);
      Console.ReadKey();
    }
  }

  public sealed class DataPullerSubject
  {
    private static volatile DataPullerSubject instance;
    private static object syncRoot = new Object();
    public static DataPullerSubject Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new DataPullerSubject();
          }
        }

        return instance;
      }
    }

    int interval = 10 * 1000;
    Timer tmr;
    private List<Observer> _observers = new List<Observer>();

    DataPullerSubject()
    {
      tmr = new Timer();
      tmr.Interval = 1; // first time to call instantly
      tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed);
      tmr.Start();
    }

    public void Attach(Observer observer)
    {
      _observers.Add(observer);
    }

    public void Detach(Observer observer)
    {
      _observers.Remove(observer);
    }

    // Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

    private void tmr_Elapsed(object source, ElapsedEventArgs e)
    {
      tmr.Interval = interval;
      tmr.Stop(); // stop the timer until all notification triggered
      this.Notify();
      tmr.Start();//start again
    }
  }


  public abstract class Observer
  {
    string data;
    public abstract void Update();
    public virtual void GetDataFromDBAndSetToDataSet(string param)
    {
      Console.WriteLine("Processing for: " + param);
      data = param + new Random().Next(1, 2000);
      Threading.Thread.Sleep(10 * 1000);//long work
      Console.WriteLine("Data set for: " + param);
    }
  }


  public sealed class Observer1 : Observer
  {
    private static volatile Observer1 instance;
    private static object syncRoot = new Object();
    public static Observer1 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer1();
          }
        }

        return instance;
      }
    }
    Observer1()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer1");
    }

  }

  public sealed class Observer2 : Observer
  {
    private static volatile Observer2 instance;
    private static object syncRoot = new Object();
    public static Observer2 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer2();
          }
        }

        return instance;
      }
    }
    Observer2()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer2");
    }

  }
}

谢谢&亲切的问候。

I have used Observer Pattern for my application.

I have a subject which has one System.Timers.Timer object in it named 'tmr'. The tick event of this timer fires after every 60 seconds. On this tick event I will notify all my observers that are attached to my subject. I have used a for-loop to iterate throught my Observers List & then fire the Observers Update method.

Assume i have 10 observers attached to my subject.

Each observer takes 10 seconds to complete its processing.

Now notification being done in a for-loop causes the last Observer's Update method to be called after 90seconds. i.e. Next Observer Update method is called only after previous one has completed its processing.

But this is not what i wanted in my application. I need all my observers Update method to be fired instantly when the timer tick occurs. So that no observer has to wait. I hope this can be done by Threading.

So, I modified code to,

// Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

But I have two doubts in my mind,

  1. If there are 10 observers
    And my timer interval is 60 seconds
    Then the statement new Thread() will fire 600 times.

    Is it efficient and recommended to create new threads on every timer tick ?

  2. What if my observers are taking too much time to complete their update logic i.e. goes more than 60seconds. Means the timer tick occurs before the observers are updated. How can i control this ?

I can post sample code.. if required...

The code i used..

using System;
using System.Collections.Generic;
using System.Timers;
using System.Text;
using Threading = System.Threading;
using System.ComponentModel;

namespace singletimers
{
  class Program
  {


    static void Main(string[] args)
    {
      DataPullerSubject.Instance.Attach(Observer1.Instance);
      DataPullerSubject.Instance.Attach(Observer2.Instance);
      Console.ReadKey();
    }
  }

  public sealed class DataPullerSubject
  {
    private static volatile DataPullerSubject instance;
    private static object syncRoot = new Object();
    public static DataPullerSubject Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new DataPullerSubject();
          }
        }

        return instance;
      }
    }

    int interval = 10 * 1000;
    Timer tmr;
    private List<Observer> _observers = new List<Observer>();

    DataPullerSubject()
    {
      tmr = new Timer();
      tmr.Interval = 1; // first time to call instantly
      tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed);
      tmr.Start();
    }

    public void Attach(Observer observer)
    {
      _observers.Add(observer);
    }

    public void Detach(Observer observer)
    {
      _observers.Remove(observer);
    }

    // Fires the updates instantly
    public void Notify()
    {
      foreach (Observer o in _observers)
      {
        Threading.Thread oThread = new Threading.Thread(o.Update);
        oThread.Name = o.GetType().Name;
        oThread.Start();
      }
    }

    private void tmr_Elapsed(object source, ElapsedEventArgs e)
    {
      tmr.Interval = interval;
      tmr.Stop(); // stop the timer until all notification triggered
      this.Notify();
      tmr.Start();//start again
    }
  }


  public abstract class Observer
  {
    string data;
    public abstract void Update();
    public virtual void GetDataFromDBAndSetToDataSet(string param)
    {
      Console.WriteLine("Processing for: " + param);
      data = param + new Random().Next(1, 2000);
      Threading.Thread.Sleep(10 * 1000);//long work
      Console.WriteLine("Data set for: " + param);
    }
  }


  public sealed class Observer1 : Observer
  {
    private static volatile Observer1 instance;
    private static object syncRoot = new Object();
    public static Observer1 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer1();
          }
        }

        return instance;
      }
    }
    Observer1()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer1");
    }

  }

  public sealed class Observer2 : Observer
  {
    private static volatile Observer2 instance;
    private static object syncRoot = new Object();
    public static Observer2 Instance
    {
      get
      {
        if (instance == null)
        {
          lock (syncRoot)
          {
            if (instance == null)
              instance = new Observer2();
          }
        }

        return instance;
      }
    }
    Observer2()
    {
    }
    public override void Update()
    {
      base.GetDataFromDBAndSetToDataSet("Observer2");
    }

  }
}

Thanks & kind regards.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

冷︶言冷语的世界 2024-11-08 04:23:19
  • 不鼓励使用new Thread。使用 TaskTask
  • 您创建 Observable 模式框架的最佳尝试可能只会接近 Rx。使用可以解决您提到的问题的方法(即,如果处理花费太多时间)。 Rx 将为您提供定义可观察场景的巨大灵活性。
  • Using new Thread is discouraged. Use Task or Task<T>
  • Your best attempt at creating an Observable pattern framework will probably only come close to Rx. Use that which solves the problems you mentioned (i.e. if processing takes too much time). Rx will give you huge flexibility in defining your observable scenarios.
夕色琉璃 2024-11-08 04:23:19

1) 您可以通过 ThreadPool.QueueUserWorkItem 或者您可以使用 任务

2) 您必须同步您的方法

1) You can use Threads from the ThreadPool via ThreadPool.QueueUserWorkItem or you can use Tasks

2) You have to synchronize your Methods.

我只土不豪 2024-11-08 04:23:19

或者,观察者可以以非阻塞方式实现更新。
也就是说,Update 总是立即返回。然后,观察者对象有责任在必要时在新线程中执行其工作。

我不确定这对您的情况是否有帮助 - 我不知道您的“观察者”是什么,但也许您也不知道?

Alternatively, the observers could implement Update in a non-blocking way.
That is, Update always returns immediately. Then it is the responsibility of the Observer objects to perform their work in a new thread if necessary.

I'm not sure if this helps in your scenario - I don't know what your 'Observers' are, but then maybe you don't know either?

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文