适当处理间歇性错误和可观察的方法

发布于 2025-02-13 12:05:30 字数 4737 浏览 0 评论 0原文

我正在使用graphql.net客户端来订阅远程服务上的数据。客户端返回可观察的可观察,因此,当创建订阅时,如预期的那样,您会在onnext中接收新消息,并在onerror < /代码>。 GraphQL客户端具有的能力自动重新连接初始连接是否失败或已建立的连接下降。

我知道,按照惯例,任何消息OnError is at 都终止消息的顺序。但是,以某种方式他们能够继续发送到onNextonerror之后,第一次onerror。我尝试阅读代码,但令人困惑。 可观察的和i 可疑似乎有多个嵌套他们在遇到错误时创建了 new 序列。

为了澄清我的问题,假设我有以下伪event基于包装器类。

public class PubSubSubscription() {
   ...
   public void CreateSubscription<TResponse>(string topic) {
      // GraphQL client
      var stream = client 
  .CreateSubscriptionStream<FixConnectionChangedSubscriptionResult>(...);

      stream
         .Subscribe(
            response => {
               // Do stuff with incoming data (validation, mapping, logging, etc.)

               // send it on the UI
               DataReceived?.Invoke(this, new DataReceivedEventArgs { Message = response });
            },
            ex => {
               // ******************************
               // Note that the Observable created by CreateSubscriptionStream()
               // will call `onError` over-and-over since it _seems_ like it is
               // creating (and re-creating) nested Observables in its own
               // classes. In the event of an initial connection failure or
               // re-connect it will raise an error and then automatically
               // try again.
               // ******************************

               // send it on to UI
               ErrorDetected?.Invoke(this, new ErrorDetectedEventArgs { Exception = ex });
            });
   }
   ...
}

然后,我将其称为如下(或足够近)...

...
var orders = ordersPubSub.CreateSubscription("/orders");

orders.DataReceived += OnDataReceived;
orders.ErrorDetected += OnErrorDetected;

void OnErrorDetected(object sender, ErrorDetectedEventArgs e) {
   // Can be called multiple times
   
   // Display message in UI
}
...

我在将基于事件的包装器方法转换为可观察的包装器方法时遇到了麻烦。

public class PubSubSubscription() {
   ...
   public IObservable<TResponse> CreateSubscription<TResponse>(string topic) {
      // Observable that I give back to my UI
      var eventSubject = new Subject<TResponse>();

      // GraphQL client
      var stream = client
      .CreateSubscriptionStream<FixConnectionChangedSubscriptionResult>(...);

      stream
         .Subscribe(
            response => {
               // Do stuff with incoming data (validation, mapping, logging, etc.)

               // send it on the UI
               eventSubject.onNext(response);
            },
            ex => {
               // ******************************
               // Note that the Observable created by CreateSubscriptionStream()
               // will call `onError` over-and-over since it _seems_ like it is
               // creating (and re-creating) nested Observables in its own
               // classes. In the event of an initial connection failure or
               // re-connect it will raise an error and then automatically
               // try again.
               // ******************************

               // send it on to UI
               eventSubject.onError(ex);
            });

      return eventSubject.AsObservable();
   }
   ...
}

然后,我将其称为如下(或足够近)...

...
var orders = ordersPubSub.CreateSubscription("/orders");

orders
   // Things I have tried...

   // Do() by itself does not stop the exception from hitting onError (which makes sense)
   .Do(
      _ => { },
      ex => // display in UI)
   // Retry() seems to cause the GraphQL subscription to "go away" because I no longer see connection attempts
   .Retry()
   // Stops the exception from hitting onError but the sequence still stops since I need to return _something_ from this method
   .Catch(() => {
      // display in UI
      
      return Observable.Empty<T>();  
   })
   .Subscribe(
      msg => // do something with data,
      ex => // display in UI);
}
...

底线是处理可以“暂时中断”序列的正确方法?

我也不确定将重试责任推向观察者的想法。这意味着我需要每次调用createSubscription()来复制逻辑。但是,如果我将移动到 createSubscription()方法中,我仍然不确定如何让观察者知道中断发生,以便可以更新UI。

我正在使用的一种方法(在将其阅读为可能的解决方案之后)是将我的Tresponse包装在“伪造” subscriptionResponse&lt; tresponse&gt;中值和异常错误属性,因此外部可观察的onnext呼叫。然后,在我的subscribe中,我添加if/else逻辑以检查错误>错误是非编号并做出相应反应的。但这只是丑陋...我几乎想回到使用事件...

I am using the GraphQL.NET client to subscribe to data on a remote service. The client returns an Observable so when the subscription is created you, as expected, receive new messages in onNext and get errors (both initial connection errors, reconnection errors, and anything else) in onError. The GraphQL client has the ability to automatically reconnect if the initial connection fails or when an established connection drops.

I know that by convention, any messages coming in on onError is supposed to terminate the sequence of messages. However, somehow they are able to continue sending to onNext and onError after that first onError. I have tried reading through the code but it is confusing. There seems to be multiple nesting of Observable and I suspect they are creating a new sequence when they encounter an error.

To clarify my issue, suppose I had the following pseudo Event based wrapper class.

public class PubSubSubscription() {
   ...
   public void CreateSubscription<TResponse>(string topic) {
      // GraphQL client
      var stream = client 
  .CreateSubscriptionStream<FixConnectionChangedSubscriptionResult>(...);

      stream
         .Subscribe(
            response => {
               // Do stuff with incoming data (validation, mapping, logging, etc.)

               // send it on the UI
               DataReceived?.Invoke(this, new DataReceivedEventArgs { Message = response });
            },
            ex => {
               // ******************************
               // Note that the Observable created by CreateSubscriptionStream()
               // will call `onError` over-and-over since it _seems_ like it is
               // creating (and re-creating) nested Observables in its own
               // classes. In the event of an initial connection failure or
               // re-connect it will raise an error and then automatically
               // try again.
               // ******************************

               // send it on to UI
               ErrorDetected?.Invoke(this, new ErrorDetectedEventArgs { Exception = ex });
            });
   }
   ...
}

I would then call it as follows (or close enough)...

...
var orders = ordersPubSub.CreateSubscription("/orders");

orders.DataReceived += OnDataReceived;
orders.ErrorDetected += OnErrorDetected;

void OnErrorDetected(object sender, ErrorDetectedEventArgs e) {
   // Can be called multiple times
   
   // Display message in UI
}
...

I am having trouble converting that event-based wrapper approach to an Observable wrapper approach.

public class PubSubSubscription() {
   ...
   public IObservable<TResponse> CreateSubscription<TResponse>(string topic) {
      // Observable that I give back to my UI
      var eventSubject = new Subject<TResponse>();

      // GraphQL client
      var stream = client
      .CreateSubscriptionStream<FixConnectionChangedSubscriptionResult>(...);

      stream
         .Subscribe(
            response => {
               // Do stuff with incoming data (validation, mapping, logging, etc.)

               // send it on the UI
               eventSubject.onNext(response);
            },
            ex => {
               // ******************************
               // Note that the Observable created by CreateSubscriptionStream()
               // will call `onError` over-and-over since it _seems_ like it is
               // creating (and re-creating) nested Observables in its own
               // classes. In the event of an initial connection failure or
               // re-connect it will raise an error and then automatically
               // try again.
               // ******************************

               // send it on to UI
               eventSubject.onError(ex);
            });

      return eventSubject.AsObservable();
   }
   ...
}

This I would then call it as follows (or close enough)...

...
var orders = ordersPubSub.CreateSubscription("/orders");

orders
   // Things I have tried...

   // Do() by itself does not stop the exception from hitting onError (which makes sense)
   .Do(
      _ => { },
      ex => // display in UI)
   // Retry() seems to cause the GraphQL subscription to "go away" because I no longer see connection attempts
   .Retry()
   // Stops the exception from hitting onError but the sequence still stops since I need to return _something_ from this method
   .Catch(() => {
      // display in UI
      
      return Observable.Empty<T>();  
   })
   .Subscribe(
      msg => // do something with data,
      ex => // display in UI);
}
...

Bottom line is what is the proper approach to dealing with sequences that can be "temporarily interrupted"?

I am also unsure of the idea of pushing the responsibility of retries onto the observer. This means that I would need to duplicate the logic each time CreateSubscription() is called. Yet, if I move it into the CreateSubscription() method, I am still unsure how to let the observer know the interruption happened so the UI can be updated.

One approach I am playing with (after reading about it as a possible solution) is to wrap my TResponse in a "fake" SubscriptionResponse<TResponse> which has T Value and Exception Error properties so the outer Observable only has onNext called. Then in my Subscribe I add if/else logic to check if Error is non-null and react accordingly. But this just feels ugly... I would almost want to go back to using events...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冷情 2025-02-20 12:05:30

如果您有一个不守规矩的可观察到的 - 遇到多个错误的一个错误 - 您可以这样做:

IObservable<int> unruly = ...;

IObservable<Notification<int>> workable =
    unruly
        .Materialize();

solethipize操作员将iObservable&lt; int&gt;变成<<代码> iObservable&lt; notification&lt; int&gt; 其中on ComppletedOnErrorOnNext消息都将转换为OnNext消息,您可以这样检查:

“

现在您可以在没有序列结束的情况下处理错误。清除它们后,您可以使用Dematerializize像这样还原序列:

IObservable<int> ruly =
    workable
        .Where(x => x.Kind != NotificationKind.OnError)
        .Dematerialize();

If you have an unruly observable - one that produces multiple errors without ended - you can make it workable by doing this:

IObservable<int> unruly = ...;

IObservable<Notification<int>> workable =
    unruly
        .Materialize();

The Materialize operator turns the IObservable<int> into an IObservable<Notification<int>> where the OnCompleted, OnError, and OnNext messages all get converted to OnNext messages that you can inspect like this:

screenshot

Now you can deal with the errors without the sequence ending. When you've cleared them you can restore the sequence with Dematerialize like so:

IObservable<int> ruly =
    workable
        .Where(x => x.Kind != NotificationKind.OnError)
        .Dematerialize();
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文