解决可观察到无法正常工作的多个承诺

发布于 2025-01-22 03:09:36 字数 1086 浏览 2 评论 0原文

我正在使用firebase存储,并且正在尝试通过函数调用加载所有资产。获取资产URL的唯一方法是呼叫getDownloadurl返回承诺。我需要为每项资产打电话给它,但由于某种原因,我无法等待所有承诺要等待。

我认为从Mergemap返回承诺会让它等待所有人,但事实并非如此。

我看了很多有关承诺和RXJ的问题,但我似乎无法弄清楚该代码出了什么问题。

getAssets() {

    return this.authService.user$.pipe(
      first(),
      switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
      switchMap(assets => from(assets.items).pipe(
        mergeMap(async (asset) => {
        
          return new Promise((res, rej) => {
          
            asset.getDownloadURL().then(url => {
              
              const _asset = {
                name: asset.name,
                url,
              };
  
              this.assets.push(_asset);

              res(_asset);
            })
            .catch((e) => rej(e));
          });
        }),
      )),
      map(() => this.assets),
    );
  }

  ...

  this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved

I'm using Firebase Storage and I'm trying to load all assets via a function call. The only way to get an assets url is to call getDownloadURL which returns a promise. I need to call this for every asset but I can't make it wait for all promises to be done before continuing for some reason.

I thought returning a promise from mergeMap would make it wait for all of them but that doesn't seem to be the case.

I've look at a number of questions regarding promises and RXJS but I can't seem to figure out what's wrong with the code.

getAssets() {

    return this.authService.user$.pipe(
      first(),
      switchMap(user => defer(() => from(this.afs.storage.ref(`${user.uid}/assets`).listAll()))),
      switchMap(assets => from(assets.items).pipe(
        mergeMap(async (asset) => {
        
          return new Promise((res, rej) => {
          
            asset.getDownloadURL().then(url => {
              
              const _asset = {
                name: asset.name,
                url,
              };
  
              this.assets.push(_asset);

              res(_asset);
            })
            .catch((e) => rej(e));
          });
        }),
      )),
      map(() => this.assets),
    );
  }

  ...

  this.getAssets().subscribe(assets => console.log(assets)); // this runs before all asset's url has been resolved

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

只是一片海 2025-01-29 03:09:36

概述

mergemap所有内部可观察物。它旋转N内部可观察的管道,并并行运行,并将所有值串出相同的耦合在管道底部(在这种情况下您的订阅语句)作为单个排放。因此,为什么this.getAssets()。订阅(Assets => console.log(Assets))在所有并行内部Mergemap Pipes完成其单个计算之前运行,因为MERGEMAP不会在发出之前等待所有这些(在它们完成时它会一一发射)。如果要等待可观察到的管道完成,则需要使用forkjoin


fork join

forkjoin您有一组可观察的物品,只关心每个的最终发射值。一个常见的用例是,如果您希望在页面加载(或其他某些事件)上发出多个请求,并且仅在收到所有人的响应时才想采取行动。通过这种方式,它类似于您可能使用Promise.All的方式。


解决方案

getAssets(): Observable<Asset[]> {
  return this.authService.user$.pipe(
    // first() will deliver an EmptyError to the observer's error callback if the
    // observable completes before any next notification was sent. If you don't
    // want this behavior, use take(1) instead.
    first(),
    // Switch to users firebase asset stream.
    switchMap(user => {
      // You might have to tweak this part. I'm not exactly sure what 
      // listAll() returns. I guessed that it returns a promise with
      // firebase asset metadata.
      return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
    }),
    // Map to objects that contain method to get image url.
    map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
    // Switch to parallel getDownloadUrl streams.
    switchMap(assets => {
      // Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
      const parallelGetAssetUrlPipes = assets.map(asset => {
        return from(asset.getDownloadUrl()).pipe(
          map(url => { name: asset.name, url })
        );
      });
      // 1) Listen to all parallel pipes.
      // 2) Wait until they've all completed.
      // 3) Merge all parallel data into a list.
      // 4) Then move list down the pipe.
      return forkJoin(parallelGetAssetUrlPipes);
    }),
    // Outputs all parallel pipe data as a single emission in list form.
    // Set local variable to users asset data.
    tap(assetObjects => this.assets = assetObjects)
  );
}

// Outputs the list of user asset data.
this.getAssets().subscribe(console.log);

祝您好运,并享受您的瑞典肉丸!

Overview

mergeMap doesn't wait for all internal observables. It spins up n internal observable pipes that run in parallel, and spits all the values out the same coupling at the bottom of the pipe (your subscribe statement in this case) as individual emissions. Hence why this.getAssets().subscribe(assets => console.log(assets)) runs before all your parallel internal mergeMap pipes complete their individual computations, because mergeMap doesn't wait for all of them before emitting (it will emit one by one as they finish). If you want to wait for n observable pipes to finish, then you need to use forkJoin.


Fork Join

forkJoin is best used when you have a group of observables and only care about the final emitted value of each. One common use case for this is if you wish to issue multiple requests on page load (or some other event) and only want to take action when a response has been received for all. In this way it is similar to how you might use Promise.all.


Solution

getAssets(): Observable<Asset[]> {
  return this.authService.user$.pipe(
    // first() will deliver an EmptyError to the observer's error callback if the
    // observable completes before any next notification was sent. If you don't
    // want this behavior, use take(1) instead.
    first(),
    // Switch to users firebase asset stream.
    switchMap(user => {
      // You might have to tweak this part. I'm not exactly sure what 
      // listAll() returns. I guessed that it returns a promise with
      // firebase asset metadata.
      return from(this.afs.storage.ref(`${user.uid}/assets`).listAll());
    }),
    // Map to objects that contain method to get image url.
    map(firebaseAssetMetadata => firebaseAssetMetadata?.items ?? []),
    // Switch to parallel getDownloadUrl streams.
    switchMap(assets => {
      // Not an rxjs map, a regular list map. Returns a list of getAssetUrlPipes.
      const parallelGetAssetUrlPipes = assets.map(asset => {
        return from(asset.getDownloadUrl()).pipe(
          map(url => { name: asset.name, url })
        );
      });
      // 1) Listen to all parallel pipes.
      // 2) Wait until they've all completed.
      // 3) Merge all parallel data into a list.
      // 4) Then move list down the pipe.
      return forkJoin(parallelGetAssetUrlPipes);
    }),
    // Outputs all parallel pipe data as a single emission in list form.
    // Set local variable to users asset data.
    tap(assetObjects => this.assets = assetObjects)
  );
}

// Outputs the list of user asset data.
this.getAssets().subscribe(console.log);

Good luck out there, and enjoy your Swedish meatballs!

南七夏 2025-01-29 03:09:36
const { from } = rxjs
const { mergeMap } = rxjs.operators

const assets = [1,2,3,4,5]

function getUrl (index) {
  return new Promise((res) => {
    setTimeout(() => res(`http://example.com/${index}`), Math.random() * 3 + 1000)
  })
}

// add param2 1 for mergeMap === concatMap
from(assets).pipe(
  mergeMap(asset => {
    return getUrl(asset)
  }, 1)
).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>

使用concatmap一个一一运行。

const { from } = rxjs
const { mergeMap } = rxjs.operators

const assets = [1,2,3,4,5]

function getUrl (index) {
  return new Promise((res) => {
    setTimeout(() => res(`http://example.com/${index}`), Math.random() * 3 + 1000)
  })
}

// add param2 1 for mergeMap === concatMap
from(assets).pipe(
  mergeMap(asset => {
    return getUrl(asset)
  }, 1)
).subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.5.5/rxjs.umd.min.js"></script>

use concatMap to run one by one.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文