RXJS:组合两个可观察的物品,因此仅在第一个可观察的值为true的值时,可观察的可观察到仅发射

发布于 2025-02-10 19:43:16 字数 834 浏览 2 评论 0 原文

我目前正在尝试使用可观察到的简单离线/在线同步机制。

基本上,我有两个可观察到的东西:

  1. 可观察的连接:第一个可观察的可观察到我是否有互联网连接的信息。当网络状态更改
  2. 数据可观察到的数据时,它会发出:第二个可观察的数据具有需要同步的数据。当要同步

我要实现的新数据是结合上述观测值时,它会发出:

  • 只要连接状态为 false ,可观察到的组合不应发出。在这种情况下,只要连接状态为,可观察到的数据应保留其状态
  • 可以观察到的合并可观察到的可观察到的数据应发出。
  • ,如果连接状态从切换到,则 false <代码> true ,它应该在可观察到的数据上的每个值发射

一个小示例,当前使用过滤器和commineLatest可以在此处找到: https://codesandbox.io/s/offline-sync-sync-sync-sync-s5lv49?file=/src/index.js

不幸的是,这根本没有预期。

是否有运营商可以实现所需的行为?作为替代方案,我可以当然可以轮询连接状态,并每x秒发出一次。但是理想情况下,我希望可以清洁可观察到的东西,尽管我对操作员最有意义的东西有些迷失。

要清除这个想法:我需要同步所有数据,而不仅仅是最新数据。因此,可观察到的数据应缓解数据。

I'm currently trying to implement a simple offline/online sync mechanism using observables.

Basically, I have two observables:

  1. Connection observable: The first observable gives me the information whether or not there is an internet connections. It emits when the network state changes
  2. Data observable: The second observable has the data that needs to synced. It emits when there is new data to be synced

What I want to achieve is to combine the above observables so that:

  • As long as the connection state is false, the combined observable shouldn't emit. In this case, the data observable should retain its state
  • As long as the connection state is true, the combined observable should emit every time there is data in the data observable
  • If the connection state switches from false to true, it should emit for every value on the data observable

A small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js

Unfortunately, this doesn't behave as intended at all.

Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.

To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

拧巴小姐 2025-02-17 19:43:16

看起来好像是正确的道路,可以做以下操作:

combineLatest([
  offlineOnlineSubject,
  dataSubject
])
  .pipe(
    filter(([online, counter]) => online),
  )
  .subscribe(([online, counter]) => {
    syncedIndicator.textContent = counter;
  });

It looks like were on the right path and could do just the following:

combineLatest([
  offlineOnlineSubject,
  dataSubject
])
  .pipe(
    filter(([online, counter]) => online),
  )
  .subscribe(([online, counter]) => {
    syncedIndicator.textContent = counter;
  });
残疾 2025-02-17 19:43:16

因此,当我们处于离线模式时,我将所有发射的数据存储在缓冲区中,一旦我们从 offline 更改为 在线,我简单地在缓冲区中发射所有内容:

let dataBuffer = [];
let isPreviouslyOnline = false;

combineLatest([dataSubject, offlineOnlineSubject])
  .pipe(
    filter(([data, isOnline]) => {
      if (!isOnline) {
        if (!isPreviouslyOnline) {
          dataBuffer.push(data);
        }

        isPreviouslyOnline = false;
        return false;
      }

      return true;
    }),
    switchMap(([data]) => {
      isPreviouslyOnline = true;
      if (dataBuffer.length > 0) {
        const tempData = [...dataBuffer];
        dataBuffer = [];

        return from(tempData);
      } else {
        return of(data);
      }
    })
  )
  .subscribe((data) => {
    console.log("Data is: ", data);
  });

代码沙盒:

我认为它有效,但感觉不佳,试图使用本机 rxjs 缓冲区操作员实现它,但无法弄清楚如何实现。很想看看是否有人有更好/清洁的解决方案。

So when we are in offline mode, I store all the emitted data in a buffer, and once we change from offline to online, I simply emit everything in the buffer:

let dataBuffer = [];
let isPreviouslyOnline = false;

combineLatest([dataSubject, offlineOnlineSubject])
  .pipe(
    filter(([data, isOnline]) => {
      if (!isOnline) {
        if (!isPreviouslyOnline) {
          dataBuffer.push(data);
        }

        isPreviouslyOnline = false;
        return false;
      }

      return true;
    }),
    switchMap(([data]) => {
      isPreviouslyOnline = true;
      if (dataBuffer.length > 0) {
        const tempData = [...dataBuffer];
        dataBuffer = [];

        return from(tempData);
      } else {
        return of(data);
      }
    })
  )
  .subscribe((data) => {
    console.log("Data is: ", data);
  });

Code sandbox: https://codesandbox.io/s/offline-sync-forked-k38kc3?file=/src/index.js

I think it works but it doesn't feel great, was trying to use the native rxjs buffer operator to achieve it but couldn't figure out how. Would love to see if anyone has a better/cleaner solution.

耳钉梦 2025-02-17 19:43:16

在搜索“门”一词之后,我找到了以下堆栈溢出问题并发表:
>条件延迟,以rxjs

基本上,该答案是基本的,该答案是基本的,该答案是基本的。所需的结果。

我在此处更新了一个示例: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

关键部分是:

const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();

const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));

dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
  console.log("Syncing data", {
    counter
  });

  syncedIndicator.innerHTML += `<li>${counter}</li>`;
});

包裹在自定义类型的操作员中:

import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';

export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
  const gateTriggerFn = () => gateTrigger.pipe(
    filter((v) => v)
  );

  return (source: Observable<T | null | undefined>) => source.pipe(
    delayWhen(gateTriggerFn)
  );
}

看起来如此这个解决方案迄今正在做我打算做的事情。

After also searching for the term "gate", I found the following stack overflow question and post:
Conditional emission delays with rxjs

Basically, the answer is using delayWhen to achieve the desired result.

I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

The crucial part is:

const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();

const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));

dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
  console.log("Syncing data", {
    counter
  });

  syncedIndicator.innerHTML += `<li>${counter}</li>`;
});

Wrapped in a custom typescript operator:

import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';

export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
  const gateTriggerFn = () => gateTrigger.pipe(
    filter((v) => v)
  );

  return (source: Observable<T | null | undefined>) => source.pipe(
    delayWhen(gateTriggerFn)
  );
}

It seems so far that this solution is doing what I intend it to do.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文