使用withlatest作为我的可管道操作员的一部分时的种族条件

发布于 2025-01-18 01:57:30 字数 580 浏览 6 评论 0原文

我有以下遭受种族条件的代码。有时我可以看到结果,有时我无法:

const op1 = ()=>{
    const filesObs = from(['a','b','c']).pipe(delay(200))

    return (obs)=>{
        return obs
        .pipe(delay(100))
        .pipe(withLatestFrom(filesObs))
    }
}

from([1,2,3,4,5]).pipe(op1()).subscribe(console.log);

因为我看不到任何印刷的东西。但是,如果我将第二个延迟增加到300,我会看到预期值:

[ 1, 'c' ]
[ 2, 'c' ]
[ 3, 'c' ]
[ 4, 'c' ]
[ 5, 'c' ]

有没有办法通过使用observeonsisscribeon 我的代码上的某个地方或我应该我应该查看结果。遵循另一个最佳实践?

I have the following code that suffers from a race condition. Sometimes I can see the results, sometimes I cannot:

const op1 = ()=>{
    const filesObs = from(['a','b','c']).pipe(delay(200))

    return (obs)=>{
        return obs
        .pipe(delay(100))
        .pipe(withLatestFrom(filesObs))
    }
}

from([1,2,3,4,5]).pipe(op1()).subscribe(console.log);

As it is I don't see anything printed. But If I increase the 2nd delay to 300 I see the expected values:

[ 1, 'c' ]
[ 2, 'c' ]
[ 3, 'c' ]
[ 4, 'c' ]
[ 5, 'c' ]

Is there a way to always see the result by using observeOn or subscribeOn somewhere on my code or should I follow another best practice?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

千仐 2025-01-25 01:57:30

首先,这不是 在操作员内的特定问题。
操作员内部未使用的代码也没有打印任何内容(面对相同的问题):

const filesObs = from(['a','b','c']).pipe(delay(200));
from([1,2,3,4,5]).pipe(delay(100),withLatestFrom(filesObs)).subscribe(console.log);

根据我们需要获得字母流的最后一个元素的所需输出,并将其与每个值配对数字流。但是,withlatestfrom()将获得的是每个时间点的最后发射元素。为了证明这一点,请考虑在字母流的发射元素(第一行)之间添加一些延迟。

//adding a delay on each of the emitted letters.
const filesObs = from(['a','b','c']).pipe(concatMap((v)=>of(v).pipe(delay(50))))
from([1,2,3,4,5]).pipe(delay(100),withLatestFrom(filesObs)).subscribe(console.log);
[ 1, 'a' ]
[ 2, 'a' ]
[ 3, 'a' ]
[ 4, 'a' ]
[ 5, 'a' ]

如您所见,以上不是所需的输出。

另外,我不确定它是否是RXJS错误,但是withlatestfrom()如果在可观察到的争论中尚未发出任何内容,则会跳过该值。请参阅下面它如何跳过第一个数字(因为目前发出它,filesobs>)尚未发出。

const filesObs = from(['a','b','c']).pipe(concatMap((v)=>of(v).pipe(delay(50))))
//Now adding a delay on each of the emitted numbers.
from([1,2,3,4,5]).pipe(concatMap((v)=>of(v).pipe(delay(25)))).pipe(withLatestFrom(filesObs)).subscribe(console.log);
[ 2, 'a' ]
[ 3, 'a' ]
[ 4, 'b' ]
[ 5, 'b' ]

解决

问题的解决方案是获得字母流的元素的last() repot() it。然后将每个编号映射到first() filesobs的元素,现在始终是最后一个('c'):

const filesObs = from(['a','b','c']).pipe(delay(200), last(), repeat())
from([1,2,3,4,5]).pipe(delay(100)).pipe(mergeMap(v=>filesObs.pipe(first(),map(v2=>[v2,v])))).subscribe(console.log);

员中相同:

const op1 = ()=>{
    const filesObs = from(['a','b','c']).pipe(delay(200), last(), repeat())

    return (obs)=>{
        return obs
        .pipe(delay(100))
        .pipe(mergeMap(v=>filesObs.pipe(first(),map(v2=>[v2,v]))))
    }
}

from([1,2,3,4,5]).pipe(op1()).subscribe(console.log);

在操作 以上将输出以下值,独立于延迟值:

[ 'c', 1 ]
[ 'c', 2 ]
[ 'c', 3 ]
[ 'c', 4 ]
[ 'c', 5 ]

First of all this is not an issue specific to withLatestFrom being inside an operator.
The code below that is not used inside an operator also does not print anything (faces the same issue):

const filesObs = from(['a','b','c']).pipe(delay(200));
from([1,2,3,4,5]).pipe(delay(100),withLatestFrom(filesObs)).subscribe(console.log);

According to the provided desired output in the question we need to get the last element of the letters stream and pair it with each value from the numbers stream. But what withLatestFrom() will get is the last emitted element at each point in time. To justify this, consider adding some delay between the emitted elements of the letters stream (1st line).

//adding a delay on each of the emitted letters.
const filesObs = from(['a','b','c']).pipe(concatMap((v)=>of(v).pipe(delay(50))))
from([1,2,3,4,5]).pipe(delay(100),withLatestFrom(filesObs)).subscribe(console.log);
[ 1, 'a' ]
[ 2, 'a' ]
[ 3, 'a' ]
[ 4, 'a' ]
[ 5, 'a' ]

As you can see the above is not the desired output.

Also, I am not sure if it is an rxjs bug, but withLatestFrom() will skip the value if there is nothing emitted yet on the observable arguement. See below how it skips the first number (because at the moment it is emits it, nothing has been emitted yet on filesObs).

const filesObs = from(['a','b','c']).pipe(concatMap((v)=>of(v).pipe(delay(50))))
//Now adding a delay on each of the emitted numbers.
from([1,2,3,4,5]).pipe(concatMap((v)=>of(v).pipe(delay(25)))).pipe(withLatestFrom(filesObs)).subscribe(console.log);
[ 2, 'a' ]
[ 3, 'a' ]
[ 4, 'b' ]
[ 5, 'b' ]

solution

One solution to the problem is to get the last() element of the letters stream and repeat() it. Then map each number with the first() element of filesObs, which will now always be the last one ('c'):

const filesObs = from(['a','b','c']).pipe(delay(200), last(), repeat())
from([1,2,3,4,5]).pipe(delay(100)).pipe(mergeMap(v=>filesObs.pipe(first(),map(v2=>[v2,v])))).subscribe(console.log);

And the same inside an operator:

const op1 = ()=>{
    const filesObs = from(['a','b','c']).pipe(delay(200), last(), repeat())

    return (obs)=>{
        return obs
        .pipe(delay(100))
        .pipe(mergeMap(v=>filesObs.pipe(first(),map(v2=>[v2,v]))))
    }
}

from([1,2,3,4,5]).pipe(op1()).subscribe(console.log);

Both of the above will output the below value, independent to the delay values:

[ 'c', 1 ]
[ 'c', 2 ]
[ 'c', 3 ]
[ 'c', 4 ]
[ 'c', 5 ]
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文