自定义挂钩订阅可观察到的数组,并从每个数组中获取最新值

发布于 01-22 22:31 字数 1237 浏览 3 评论 0原文

我有一个可观察到的流$,当关联的异步操作完成时,每次都会发出一个值。我想在数组arr中汇总结果,其中arr [i] = undefined如果stream> streams $ [i]代码>尚未完成,并且流$ [i]的解决值(如果有),因此对于3个可观察到的钩子,挂钩应返回以下内容:

[undefined, undefined, undefined] // 1
['A', undefined, undefined] // 2
['A', undefined, 'C'] // 3
['A', 'B', 'C'] // 4
// done, unsubscribe

这是我当前拥有的:

const useLatest = <T>(streams$: Observables<T>[]) => {
   const [state, setState] = useState<T[]>(Array(streams$.length).fill(undefined));
   const latest$ = combineLatest(streams$.map($ => $.pipe(startWith(undefined))));
   
   useEffect(() => {
      const subscription = latest$.subscribe((values) => setState(values));
      return () => {
          subscription.unsubscribe();
      }
   }, []);
   return state;   
}

这给了我靠近正确的返回值(打印所有未定义的未定义两次),但是由于空依赖性数组,如果流$成为新的可观察到的不同数组,则不会重新计算。我尝试修复此操作的其他事情导致的无限排放[未定义,未定义,未定义] /代码>。使用Piping 诸如最新$take take具有第二个参数,或将单个streams $ [i]输送到,等等。

I have an array of Observables streams$ which each emit a value once, when the associated async operation completes. I want to aggregate the results in an array arr, where arr[i] = undefined if streams$[i] hasn't completed, and the resolved value of streams$[i] if it has, so for 3 observables, the hook should return the following:

[undefined, undefined, undefined] // 1
['A', undefined, undefined] // 2
['A', undefined, 'C'] // 3
['A', 'B', 'C'] // 4
// done, unsubscribe

This is what I have currently:

const useLatest = <T>(streams$: Observables<T>[]) => {
   const [state, setState] = useState<T[]>(Array(streams$.length).fill(undefined));
   const latest$ = combineLatest(streams$.map($ => $.pipe(startWith(undefined))));
   
   useEffect(() => {
      const subscription = latest$.subscribe((values) => setState(values));
      return () => {
          subscription.unsubscribe();
      }
   }, []);
   return state;   
}

This gives me close to the correct return values (prints all undefined twice), but due to the empty dependency array will not recompute if streams$ becomes a different array of new observables. Other things I've done to try and fix this either result in infinite emissions of either [undefined, undefined, undefined] or ['A', 'B', 'C']. Using things like piping latest$ to takeWhile with second parameter true, or piping the individual streams$[i] to take(2), etc.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

七色彩虹2025-01-29 22:31:29

解决解决方案

不确定为什么您可以将stream作为依赖关系。我从未使用过React(或React钩子),但是我已经阅读了基础知识。看起来您希望每当有一个新的流阵列(通过参考)时,您希望效果清理和重新运行。如果我正确理解,则依赖性数组通过参考平等起作用。

const useLatest = <T>(streams: Observable<T>[]) => {
  const [state, setState] = useState<T[]>(Array(streams.length).fill(undefined));
  
  useEffect(() => {
    const sub = combineLatest(
      streams.map($ => $.pipe(
        startWith(undefined)
      ))
    ).pipe(
      filter(latest => !latest.every(v => v === undefined))
    ).subscribe((values) => setState(values));

    return sub.unsubscribe.bind(sub);
    
  }, [streams]);

  return state;   
}

其他一些操作员正在采取行动:p

类似的操作人员应该可以工作。

const useLatest = <T>(streams: Observable<T>[]) => {
  const defaultState = () => Array(streams.length).fill(undefined);
  const [state, setState] = useState<T[]>(defaultState());
  
  useEffect(() => {
    const sub = merge(...streams.map((s,i) => s.pipe(
      map(v => ({i,v}))
    ))).pipe(
      scan((acc, {i,v}) => {
        acc[i] = v;
        return acc;
      }, defaultState())
    ).subscribe((values) => setState(values));

    return sub.unsubscribe.bind(sub);
    
  }, [streams]);

  return state;   
}

Toward a solution

Not sure why you can put streams as a dependency. I've never used react (or react hooks), but I've read up on the basics. Looks like you'll want the effect to clean up and re-run whenever you've got a new streams array, (by reference). If I understand correctly, the dependency array works by reference equality.

const useLatest = <T>(streams: Observable<T>[]) => {
  const [state, setState] = useState<T[]>(Array(streams.length).fill(undefined));
  
  useEffect(() => {
    const sub = combineLatest(
      streams.map($ => $.pipe(
        startWith(undefined)
      ))
    ).pipe(
      filter(latest => !latest.every(v => v === undefined))
    ).subscribe((values) => setState(values));

    return sub.unsubscribe.bind(sub);
    
  }, [streams]);

  return state;   
}

Some other operators in action :P

Something like this should work the same.

const useLatest = <T>(streams: Observable<T>[]) => {
  const defaultState = () => Array(streams.length).fill(undefined);
  const [state, setState] = useState<T[]>(defaultState());
  
  useEffect(() => {
    const sub = merge(...streams.map((s,i) => s.pipe(
      map(v => ({i,v}))
    ))).pipe(
      scan((acc, {i,v}) => {
        acc[i] = v;
        return acc;
      }, defaultState())
    ).subscribe((values) => setState(values));

    return sub.unsubscribe.bind(sub);
    
  }, [streams]);

  return state;   
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文