我将如何在 Silverlight 中使用反应式扩展 (Rx) 组织这些调用?

发布于 2024-09-15 03:07:25 字数 2072 浏览 0 评论 0原文

我有一些必须按顺序执行的调用。考虑一个具有 Query 和 Load 方法的 IService。查询提供了小部件列表,负载提供了“默认”小部件。因此,我的服务看起来像这样。

void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback); 

考虑到这一点,这里是视图模型的粗略草图:

public class ViewModel : BaseViewModel
{
   public ViewModel()
   {
      Widgets = new ObservableCollection<Widget>();

      WidgetService.Query((widgets,exception) =>
      {
          if (exception != null) 
          {
              throw exception;
          }

          Widgets.Clear();

          foreach(var widget in widgets)
          {
             Widgets.Add(widget);
          }

          WidgetService.Load((defaultWidget,ex) =>
          {
             if (ex != null)
             {
                 throw ex;
             }
             if (defaultWidget != null)
             {
                CurrentWidget = defaultWidget;
             }
          }
      });
   }

   public IService WidgetService { get; set; } // assume this is wired up

   public ObservableCollection<Widget> Widgets { get; private set; }

   private Widget _currentWidget; 

   public Widget CurrentWidget 
   {
      get { return _currentWidget; }
      set 
      {
         _currentWidget = value; 
         RaisePropertyChanged(()=>CurrentWidget);
      }
   }
}

我想做的是简化调用查询然后调用默认值的顺序工作流程。也许最好的方法是嵌套 lambda 表达式,如我所示,但我认为 Rx 可能有更优雅的方法。我不想为了 Rx 而使用 Rx,但如果它可以让我组织上面的逻辑,以便更容易在方法中阅读/维护,我会利用它。理想情况下,类似:

Observable.Create(
   ()=>firstAction(), 
   ()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; }); 

使用强大的线程库,我会做类似的事情:

Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;

这使得工作流程是连续的并消除嵌套(产量是异步枚举器的一部分,并且是阻塞直到结果出现的边界)变得更加明显后退)。

任何类似的事情对我来说都是有意义的。

所以问题的本质是:我是否试图将方钉装入圆孔中,或者是否有办法使用 Rx 重新定义嵌套异步调用?

I have some calls that must execute sequentially. Consider an IService that has a Query and a Load method. The Query gives a list of widgets, and the load provides a "default" widget. Hence, my service looks like this.

void IService.Query(Action<IEnumerable<Widget>,Exception> callback);
void IService.Load(Action<Widget,Exception> callback); 

With that in mind, here is a rough sketch of the view model:

public class ViewModel : BaseViewModel
{
   public ViewModel()
   {
      Widgets = new ObservableCollection<Widget>();

      WidgetService.Query((widgets,exception) =>
      {
          if (exception != null) 
          {
              throw exception;
          }

          Widgets.Clear();

          foreach(var widget in widgets)
          {
             Widgets.Add(widget);
          }

          WidgetService.Load((defaultWidget,ex) =>
          {
             if (ex != null)
             {
                 throw ex;
             }
             if (defaultWidget != null)
             {
                CurrentWidget = defaultWidget;
             }
          }
      });
   }

   public IService WidgetService { get; set; } // assume this is wired up

   public ObservableCollection<Widget> Widgets { get; private set; }

   private Widget _currentWidget; 

   public Widget CurrentWidget 
   {
      get { return _currentWidget; }
      set 
      {
         _currentWidget = value; 
         RaisePropertyChanged(()=>CurrentWidget);
      }
   }
}

What I'd like to do is simplify the sequential workflow of calling query and then the default. Perhaps the best way to do this is nested with lambda expressions as I've shown, but I figured there may be a more elegant way with Rx. I don't want to use Rx for the sake of Rx, but if it can allow me to organize the logic above so it is easier to read/maintain in the method, I'll take advantage of it. Ideally, something like:

Observable.Create(
   ()=>firstAction(), 
   ()=>secondAction())
.Subscribe(action=>action(),error=>{ throw error; }); 

With the power threading library, I'd do something like:

Service.Query(list=>{result=list};
yield return 1;
ProcessList(result);
Service.Query(widget=>{defaultWidget=widget};
yield return 1;
CurrentWidget = defaultWidget;

That makes it far more evident that the workflow is sequential and eliminates nesting (the yields are part of the async enumerator and are boundaries that block until the results come back).

Anything similar would make sense to me.

So the essence of the question: am I trying to fit a square peg into a round hole, or is there a way to redefine the nested asynchronous calls using Rx?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

哥,最终变帅啦 2024-09-22 03:07:25

您可以转换服务方法,以便它们返回 IObservable,而不是将回调作为参数。在这种情况下,可以使用 SelectMany 来实现顺序工作流,类似这样...

        WidgetService.Query()
            .SelectMany(
                widgets =>
                {
                    Widgets.Clear();
                    foreach (var w in widgets)
                    {
                        Widgets.Add(w);
                    }

                    return WidgetService.Load();
                }
            )
            .Do(
                defaultWidget =>
                {
                    if (defaultWidget != null)
                        Default = defaultWidget;
                }
            )
            .Subscribe(
                _ => { },
                e => { throw e; }
            );

但是,IMO F# 异步看起来会更加清晰(在示例中,我假设服务方法分别返回 Async> 和 Async)。请注意,示例没有考虑哪个线程正在修改数据字段,在实际代码中,您应该注意这一点:

    let load = async {
            let! widgets = WidgetService.Query()

            Widgets.Clear()
            for w in widgets do
                Widgets.Add(w)

            let! defaultWidget = WidgetService.Load()
            if defaultWidget <> null then
                Default <- defaultWidget

            return ()
        }

    Async.StartWithContinuations(
        load, 
        ignore, // success continuation - ignore result
        raise,  // error continuation - reraise exception
        ignore  // cancellation continuation - ignore
        )

EDITED

事实上,可以将技术与您在您提到的迭代器一起使用问题:

    private IEnumerable<IObservable<object>> Intialize()
    {
        var widgetsList = WidgetService.Query().Start();
        yield return widgetsList;

        Widgets.Clear();
        foreach (var w in widgetsList[0])
        {
            Widgets.Add(w);
        }

        var defaultWidgetList = WidgetService.Load().Start();
        yield return defaultWidgetList;

        if (defaultWidgetList[0] != null)
            Default = defaultWidgetList[0];
    }

    Observable
        .Iterate(Intialize)
        .Subscribe(
        _ => { },
        ex => { throw ex; }
        );

You can convert methods of service so they return IObservable instead of taking callback as a parameter. In this case sequential workflow can be implemented using SelectMany, something like this...

        WidgetService.Query()
            .SelectMany(
                widgets =>
                {
                    Widgets.Clear();
                    foreach (var w in widgets)
                    {
                        Widgets.Add(w);
                    }

                    return WidgetService.Load();
                }
            )
            .Do(
                defaultWidget =>
                {
                    if (defaultWidget != null)
                        Default = defaultWidget;
                }
            )
            .Subscribe(
                _ => { },
                e => { throw e; }
            );

However IMO F# asyncs will look much more clear (in sample I assume that methods of service returns Async> and Async respectively). Note that sample doesn't take in account what thread is modifying data fields, in real-world code you should pay attention to this:

    let load = async {
            let! widgets = WidgetService.Query()

            Widgets.Clear()
            for w in widgets do
                Widgets.Add(w)

            let! defaultWidget = WidgetService.Load()
            if defaultWidget <> null then
                Default <- defaultWidget

            return ()
        }

    Async.StartWithContinuations(
        load, 
        ignore, // success continuation - ignore result
        raise,  // error continuation - reraise exception
        ignore  // cancellation continuation - ignore
        )

EDITED

In fact it is possible to use technique with iterators you mentioned in your question:

    private IEnumerable<IObservable<object>> Intialize()
    {
        var widgetsList = WidgetService.Query().Start();
        yield return widgetsList;

        Widgets.Clear();
        foreach (var w in widgetsList[0])
        {
            Widgets.Add(w);
        }

        var defaultWidgetList = WidgetService.Load().Start();
        yield return defaultWidgetList;

        if (defaultWidgetList[0] != null)
            Default = defaultWidgetList[0];
    }

    Observable
        .Iterate(Intialize)
        .Subscribe(
        _ => { },
        ex => { throw ex; }
        );
莫相离 2024-09-22 03:07:25

您也可以使用 ReactiveXaml 来执行此操作,但由于您的 CurrentWidget 和 Widgets 都是可变的,因此您不能使其干净(有一个名为 ObservableAsPropertyHelper 它将根据 IObservable 更新属性并触发 RaisePropertyChanged):

public class ViewModel
{
    public ViewModel()
    {
        // These return a Func that wraps an async call in an IObservable<T>
        // that always yields only one item (the result of the call)
        var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
        var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);

        // Create a new command 
        QueryAndLoad = new ReactiveAsyncCommand();

        // QueryAndLoad fires every time someone calls ICommand.Execute
        // The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
        var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
                                        .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));

        // Query up the Widgets 
        async_results.Subscribe(x => x.Run(Widgets.Add));

        // Now execute the Load
        async_results.SelectMany(_ => LoadAsObservable())
                     .Subscribe(x => CurrentWidget = x);

        QueryAndLoad.Execute();
    }

    public ReactiveAsyncCommand QueryAndLoad {get; private set; }

    public ObservableCollection<Widget> Widgets {get; private set; }

    public Widget CurrentWidget {get; set; }
}

You could also do this using ReactiveXaml, though since your CurrentWidget and Widgets are both mutable, you can't make it as clean (there's a class called ObservableAsPropertyHelper which will update a property based on an IObservable and fire the RaisePropertyChanged):

public class ViewModel
{
    public ViewModel()
    {
        // These return a Func that wraps an async call in an IObservable<T>
        // that always yields only one item (the result of the call)
        var QueryAsObservable = Observable.FromAsyncCommand<IEnumerable<Widget>>(WebService.BeginQuery, WebService.EndQuery);
        var LoadAsObservable = Observable.FromAsyncCommand<Widget>(WebService.BeginLoad, WebService.EndLoad);

        // Create a new command 
        QueryAndLoad = new ReactiveAsyncCommand();

        // QueryAndLoad fires every time someone calls ICommand.Execute
        // The .Do is the hacky part, for sync calls it's hidden by RegisterAsyncFunction
        var async_results = QueryAndLoad.SelectMany(_ => QueryAsObservable())
                                        .Do(_ => DoTranslate.AsyncCompletedNotification.OnNext(new Unit()));

        // Query up the Widgets 
        async_results.Subscribe(x => x.Run(Widgets.Add));

        // Now execute the Load
        async_results.SelectMany(_ => LoadAsObservable())
                     .Subscribe(x => CurrentWidget = x);

        QueryAndLoad.Execute();
    }

    public ReactiveAsyncCommand QueryAndLoad {get; private set; }

    public ObservableCollection<Widget> Widgets {get; private set; }

    public Widget CurrentWidget {get; set; }
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文