在可观察的RXJS中管理内部状态

发布于 2025-02-13 12:21:48 字数 840 浏览 2 评论 0原文

我是RXJ的新手,想知道内部状态。

我们的用例是返回初始状态(具有IDS的项目列表)和可观察到将突变(已删除ID)的可观察到的承诺。然后,更新的项目列表是一个持久状态,应用于构建结果。

我可以使用一个全局变量将一些东西侵入,但正如预期的那样,它的行为很奇怪:

import { combineLatest, from } from "rxjs";
import { startWith } from "rxjs/operators";

...

const initialObservable = from(initialPromise);

const mutationObservableWithStart = mutationObservable.pipe(startWith(null));

const globalDeletedIds: string[] = [];

return combineLatest([initialObservable, mutationObservableWithStart]).pipe(
  map(([items, mutation]) => {
    if (mutation && mutation.deleted) {
      globalDeletedIds.push(mutation.id);
    }

    const currentItems = items.filter(doesNotIncludeIds(globalDeletedIds));

    return buildResult(currentItems);
  })
);

我的问题是:如何在可观察到的内部状态正确管理内部状态?

I'm new to RxJS and wondering about internal state.

Our use case is a promise that returns an initial state (a list of items with ids) and an observable that streams mutations (deleted ids) to that state. The updated list of items is then a persistent state and should be used in building the result.

I can hack something together using a global variable but as expected it behaves weirdly:

import { combineLatest, from } from "rxjs";
import { startWith } from "rxjs/operators";

...

const initialObservable = from(initialPromise);

const mutationObservableWithStart = mutationObservable.pipe(startWith(null));

const globalDeletedIds: string[] = [];

return combineLatest([initialObservable, mutationObservableWithStart]).pipe(
  map(([items, mutation]) => {
    if (mutation && mutation.deleted) {
      globalDeletedIds.push(mutation.id);
    }

    const currentItems = items.filter(doesNotIncludeIds(globalDeletedIds));

    return buildResult(currentItems);
  })
);

My question is: how to correctly manage internal state in an observable?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

自由范儿 2025-02-20 12:21:48

如果我理解问题是正确的,我会这样继续(Inline评论尝试解释逻辑),

// first we start the stream with the initial list of items (an array notified
// by the initialObservable)
initialObservable.pipe(
  // once initialObservable notifies the list we switch to another observable
  switchMap(items => {
    // the stream we switch to starts with the stream of mutations
    return mutationObservable.pipe(
      // we need to notify something since the very beginning, not only from
      // when the first mutation occurs, therefore we use startWith to say
      // that the new stream starts immediately notifying a null (assuming 
      // there is no null key in items)
      startWith(null),
      // since we need to keep a state but also to notify the state any time 
      // an event from upstream (i.e. a mutation) occurs, then we use the
      // scan operator which holds an internal state which is notified any time
      // upstream notifies something
      scan((state, m) => {
        // the state, i.e. the list of items, gets updated according to the
        //  mutations received from upstream
        state = state.filter(item => item.id !== m)
        // scan returns a new state
        return state
      }, items)  // the state is initialized with the items received
    )
  })
)

您可以看一下此stackblitz 示例。

If I understand the problem right, I would proceed like this (inline comments try to explain the logic)

// first we start the stream with the initial list of items (an array notified
// by the initialObservable)
initialObservable.pipe(
  // once initialObservable notifies the list we switch to another observable
  switchMap(items => {
    // the stream we switch to starts with the stream of mutations
    return mutationObservable.pipe(
      // we need to notify something since the very beginning, not only from
      // when the first mutation occurs, therefore we use startWith to say
      // that the new stream starts immediately notifying a null (assuming 
      // there is no null key in items)
      startWith(null),
      // since we need to keep a state but also to notify the state any time 
      // an event from upstream (i.e. a mutation) occurs, then we use the
      // scan operator which holds an internal state which is notified any time
      // upstream notifies something
      scan((state, m) => {
        // the state, i.e. the list of items, gets updated according to the
        //  mutations received from upstream
        state = state.filter(item => item.id !== m)
        // scan returns a new state
        return state
      }, items)  // the state is initialized with the items received
    )
  })
)

You can take a look at this stackblitz for an example.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文