是否有任何waituntil运算符或rxswift中的类似操作员来等待一个值?

发布于 2025-02-09 22:27:26 字数 744 浏览 2 评论 0原文

我正在使用rxswift来处理许多Async AvassetWriterInput写操作,但是在文件中写入更多缓冲区之前,我需要等待IsreadyFormoremediadata,我该如何处理?

基本上,可观察到的可观察到的是Asyncwritersubject发出的大量缓冲区,我想按照我收到的顺序编写所有缓冲区。

我有这个主题:

private var asyncWriter = ReplaySubject<(AVAssetWriterInput,CMSampleBuffer)>.create(bufferSize: 1)

我使用此代码散发了它的值:

asyncWriter.onNext((videoWriterInput, buffer))

我在这里订阅它以聆听:

disposable = asyncWriter.asObservable()
    .takeWhile {
        (writerPointer, _) in
            writerPointer.isReadyForMoreMediaData
        }.observeOn(MainScheduler.asyncInstance)
        .subscribe(onNext: { (writerPointer, buffer) in
            writerPointer.append(buffer)
    })

I am using RXSwift to handle with a lot of async AVAssetWriterInput write operations, but I need to wait for isReadyForMoreMediaData before writing more buffers inside the file, how can I handle this?

Basically, the observable receives a lot of buffers emitted by the asyncWriterSubject and I want to write all of them in the order that I am receiving.

I have this subject:

private var asyncWriter = ReplaySubject<(AVAssetWriterInput,CMSampleBuffer)>.create(bufferSize: 1)

I emit the values for it using this code:

asyncWriter.onNext((videoWriterInput, buffer))

And I am subscribing it here to listen:

disposable = asyncWriter.asObservable()
    .takeWhile {
        (writerPointer, _) in
            writerPointer.isReadyForMoreMediaData
        }.observeOn(MainScheduler.asyncInstance)
        .subscribe(onNext: { (writerPointer, buffer) in
            writerPointer.append(buffer)
    })

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

冰雪之触 2025-02-16 22:27:26

以下是一些有关如何处理背压的一般信息。

这将向CMSPlame Buffer写入cmsample buffer WriterPointer每秒可达100次。当isreadyFormoreMediAdata为false时,它将存储示例缓冲区,直到bool再次为true为止。

func example(asyncWriter: Observable<CMSampleBuffer>, writerPointer: AVAssetWriterInput) -> Disposable {
    enum Action {
        case buffer(CMSampleBuffer)
        case isReady(Bool)
    }
    var isReadyForMoreMediaData: Observable<Bool> {
        Observable<Int>.interval(.milliseconds(10), scheduler: MainScheduler.instance)
            .flatMap { [writerPointer] _ in Observable.just(writerPointer.isReadyForMoreMediaData) }
    }
    return Observable.merge(
        isReadyForMoreMediaData.map { Action.isReady($0) },
        asyncWriter.map { Action.buffer($0) }
    )
        .scan(into: (buffer: [CMSampleBuffer](), trigger: false, emission: CMSampleBuffer?.none), accumulator: { current, new in
            switch new {
            case let .buffer(buff):
                if current.trigger {
                    if current.buffer.isEmpty {
                        current.emission = buff
                    }
                    else {
                        current.emission = current.buffer[0]
                        current.buffer.removeFirst()
                        current.buffer.append(buff)
                    }
                }
                else {
                    current.buffer.append(buff)
                }
            case let .isReady(trig):
                current.trigger = trig
                if trig && !current.buffer.isEmpty {
                    current.emission = current.buffer[0]
                    current.buffer.removeFirst()
                }
            }
        })
        .compactMap { $0.emission }
        .observe(on: MainScheduler.instance)
        .subscribe(onNext: { buffer in
            writerPointer.append(buffer)
        })
}

Here's some general information on how to handle back pressure.

This will write a CMSampleBuffer to the writerPointer up to 100 times per second. When isReadyForMoreMediaData is false, it will store sample buffers until the Bool is true again.

func example(asyncWriter: Observable<CMSampleBuffer>, writerPointer: AVAssetWriterInput) -> Disposable {
    enum Action {
        case buffer(CMSampleBuffer)
        case isReady(Bool)
    }
    var isReadyForMoreMediaData: Observable<Bool> {
        Observable<Int>.interval(.milliseconds(10), scheduler: MainScheduler.instance)
            .flatMap { [writerPointer] _ in Observable.just(writerPointer.isReadyForMoreMediaData) }
    }
    return Observable.merge(
        isReadyForMoreMediaData.map { Action.isReady($0) },
        asyncWriter.map { Action.buffer($0) }
    )
        .scan(into: (buffer: [CMSampleBuffer](), trigger: false, emission: CMSampleBuffer?.none), accumulator: { current, new in
            switch new {
            case let .buffer(buff):
                if current.trigger {
                    if current.buffer.isEmpty {
                        current.emission = buff
                    }
                    else {
                        current.emission = current.buffer[0]
                        current.buffer.removeFirst()
                        current.buffer.append(buff)
                    }
                }
                else {
                    current.buffer.append(buff)
                }
            case let .isReady(trig):
                current.trigger = trig
                if trig && !current.buffer.isEmpty {
                    current.emission = current.buffer[0]
                    current.buffer.removeFirst()
                }
            }
        })
        .compactMap { $0.emission }
        .observe(on: MainScheduler.instance)
        .subscribe(onNext: { buffer in
            writerPointer.append(buffer)
        })
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文