RXJS:我可以根据单个输出的条目顺序交流多个流吗?

发布于 2025-02-13 05:57:43 字数 694 浏览 1 评论 0 原文

这纯粹是一种了解操作员的学术练习。如何使用RXJ交流传入流,

例如从此处进行:

// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

对此:

import { of } from 'rxjs';

const source1 = of(1,3,5,7,9);
const source2 = of(2,4,6,8,10);

// a simple merge will just append source1 and source2
// how do I obtain an output = 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2).subscribe(val => console.log(val))

约束:这是一个一般的问题,因此我不想根据此特定示例的奇数/偶数使用谓词,而是要根据传入元素的顺序创建输出,即从第一个元素来看。每个流,然后第二个流,等等

This is purely an academic exercise to understand operators. How can I use RXJS to interleave incoming streams

e.g. go from this:

// RxJS v6+
import { of } from 'rxjs';
//emits any number of provided values in sequence
const source = of(1, 2, 3, 4, 5);
//output: 1,2,3,4,5
const subscribe = source.subscribe(val => console.log(val));

to this:

import { of } from 'rxjs';

const source1 = of(1,3,5,7,9);
const source2 = of(2,4,6,8,10);

// a simple merge will just append source1 and source2
// how do I obtain an output = 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2).subscribe(val => console.log(val))

Constraints: This is a general question so I dont want to use predicates based on the odd/even separation of this specific example, but to create an output based purely on order of incoming elements i.e. take the first from each stream, then the second, etc.

Is that possible?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

神妖 2025-02-20 05:57:43

这样做的标准方法确实是 Merge 函数,可以接收任何数量的函数观察物作为参数,并在序塞中排放所有项目,而不论函数参数中可观察到的顺序。但是,正如您指出的那样,它附加了结果,原因不是合并的问题,而是实际上的noreferrer“> 同步弹出值,因此它们都以相同的JavaScript循环为顺序。您可以使用可观察到值的可观察值来更改该值,或使用一些可用的async “ nofollow noreferrer”>调度程序

assynchronous可观察:

我创建了此 asyncof 函数作为一个模型可以观察到随着时间的推移而填充的值,例如Websocket客户端或用户介绍。

import { merge, Observable, EMPTY } from 'rxjs';
import { toArray } from 'rxjs/operators';

const asyncOf = <T = any>(args: T[], interval = 500): Observable<T> => {
  let i = 0;
  let intervalRef;

  if(!args?.length) {
    return EMPTY;
  }
 
  return new Observable(observer => {
    intervalRef = setInterval(() => {
      if(args[i]) {
        observer.next(args[i])
      }
      ++i;

      if(!args[i]) {
        observer.complete()
        if(intervalRef) {
           clearInterval(intervalRef)
        }
      }
    }, interval)
  })
}

const source1 = asyncOf([1, 3, 5, 7, 9]);
const source2 = asyncOf([2, 4, 6, 8, 10]);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

在这种情况下,合并正常工作,因为两个可观察到的都不同步运行。

异步调度程序

另一个选项是使用异步调度程序之一(或创建自己的)来描述 应填充每个项目时。这更接近您的示例,并且更深入地深入研究RXJ:

import { merge, scheduled, asyncScheduler } from 'rxjs';
import { toArray } from 'rxjs/operators';

const source1 = scheduled([1, 3, 5, 7, 9], asyncScheduler);
const source2 = scheduled([2, 4, 6, 8, 10], asyncScheduler);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

The standard way of doing this is indeed the merge function, which can receive any number of observables as arguments, and emits all of their items in sequece regardless of the order of the observables in the function parameters. But as you pointed out, it appended the results, the reason for this is not an issue with merge, but actually that of emmited the values synchronously, so they all ran in order in the same javascript loop. You can change that by using an observable that emits values assynchronously, or create an observable with some of the available async Schedulers

Assynchronous observable:

I created this asyncOffunction as a mockup to a Observable with values emmited over time, like a websocket client or user interations.

import { merge, Observable, EMPTY } from 'rxjs';
import { toArray } from 'rxjs/operators';

const asyncOf = <T = any>(args: T[], interval = 500): Observable<T> => {
  let i = 0;
  let intervalRef;

  if(!args?.length) {
    return EMPTY;
  }
 
  return new Observable(observer => {
    intervalRef = setInterval(() => {
      if(args[i]) {
        observer.next(args[i])
      }
      ++i;

      if(!args[i]) {
        observer.complete()
        if(intervalRef) {
           clearInterval(intervalRef)
        }
      }
    }, interval)
  })
}

const source1 = asyncOf([1, 3, 5, 7, 9]);
const source2 = asyncOf([2, 4, 6, 8, 10]);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

In this case the merge works correctly, because both observables are running asynchronously.

Async schedulers

Another option is using one of the async schedulers (or create your own) to describe when each item should be emmited. This is closer to your example, and a somewhat deeper dive into rxjs:

import { merge, scheduled, asyncScheduler } from 'rxjs';
import { toArray } from 'rxjs/operators';

const source1 = scheduled([1, 3, 5, 7, 9], asyncScheduler);
const source2 = scheduled([2, 4, 6, 8, 10], asyncScheduler);

//output: 1,2,3,4,5,6,7,8,9,10
const subscribe = merge(source1, source2)
  //.pipe(toArray()) //collect all emmited values and emmit an array when the observable completes
  .subscribe((val) => console.log(val));

Scheduled comes on RXJS 6.5+ and deprecates the use of scheduler function in other functions like of,

终陌 2025-02-20 05:57:43

您可以 zip 流,然后打开这样的zipper缩放值:

const odd$ = of(1, 3, 5, 7, 9).pipe(delay(50));
const even$ = of(2, 4, 6, 8, 10);

zip([odd$, even$])
  .pipe(switchMap(([odd, even]) => of(odd, even)))
  .subscribe(console.log);

stackblitz: https://stackblitz.com/edit/rxjs-uktrkz?devtoolsheight = 60&amp;file=index.ts

You could zip the streams, and then unpack the zipped values like this:

const odd$ = of(1, 3, 5, 7, 9).pipe(delay(50));
const even$ = of(2, 4, 6, 8, 10);

zip([odd$, even$])
  .pipe(switchMap(([odd, even]) => of(odd, even)))
  .subscribe(console.log);

stackblitz: https://stackblitz.com/edit/rxjs-uktrkz?devtoolsheight=60&file=index.ts

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文