Concat Connectable Observables

发布于 2025-02-03 16:52:28 字数 784 浏览 3 评论 0原文

我有一个列表 connectableObservable,我想在上一个项目完成后从列表中运行一个项目。我已经尝试在列表上应用concat()方法,但显然此方法对ConnectableObservables不起作用。我该怎么做?

这就是我尝试的:

ConnectableObservable<Long> observable1 =
    Observable.timer(1500, TimeUnit.MILLISECONDS).publish();

ConnectableObservable<Long> observable2 =
    Observable.timer(1550, TimeUnit.MILLISECONDS).publish();


List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);

Observable.concat(list).doOnNext(aLong -> {
    Log.i("result", aLong.toString());
}).subscribe();

observable1.connect();
observable2.connect();

在这里,observable2observable1完成后运行50毫秒,而不是预期的1550。

I have a List of ConnectableObservable, and I want to run one item from the list when the previous item is done. I've tried applying concat() method on the list, but apparently this method doesn't work on ConnectableObservables. How can I do this?

This is what I've tried:

ConnectableObservable<Long> observable1 =
    Observable.timer(1500, TimeUnit.MILLISECONDS).publish();

ConnectableObservable<Long> observable2 =
    Observable.timer(1550, TimeUnit.MILLISECONDS).publish();


List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);

Observable.concat(list).doOnNext(aLong -> {
    Log.i("result", aLong.toString());
}).subscribe();

observable1.connect();
observable2.connect();

Here, observable2 runs 50 milliseconds after observable1 completes, not 1550 as expected.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

不喜欢何必死缠烂打 2025-02-10 16:52:28

您没有在ConnectableObservable上调用connect()方法,因此它没有开始发布任何内容。

放在

observable1.connect();
observable2.connect();

代码末尾。
另外,您可以将.autoconnect(1)添加到observable1observable2

它不能同时工作,如以下Junit确认:

    @Test
    void connectableTest() {
        TestScheduler testScheduler = new TestScheduler();
        ConnectableObservable<Integer> observable1 =
                Observable
                        .just(1, 2, 3)
                        .zipWith(
                                Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();
        ConnectableObservable<Integer> observable2 =
                Observable
                        .just(4, 5, 6)
                        .zipWith(
                                Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();

        List<ConnectableObservable<Integer>> list = new ArrayList<>();
        list.add(observable1);
        list.add(observable2);

        TestObserver<Integer> output = Observable.concat(list).test();

        observable1.connect();
        observable2.connect();

        testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
        output.assertValues(1,2,3,4,5,6).assertComplete();
    }

You didn't invoke connect() method on ConnectableObservable, so it hadn't started to publish anything.

Put

observable1.connect();
observable2.connect();

at the end of your code.
Alternatively you can add .autoConnect(1) to your observable1 and observable2.

It doesn't work simultaneously as following junit confirms it:

    @Test
    void connectableTest() {
        TestScheduler testScheduler = new TestScheduler();
        ConnectableObservable<Integer> observable1 =
                Observable
                        .just(1, 2, 3)
                        .zipWith(
                                Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();
        ConnectableObservable<Integer> observable2 =
                Observable
                        .just(4, 5, 6)
                        .zipWith(
                                Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
                        .publish();

        List<ConnectableObservable<Integer>> list = new ArrayList<>();
        list.add(observable1);
        list.add(observable2);

        TestObserver<Integer> output = Observable.concat(list).test();

        observable1.connect();
        observable2.connect();

        testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
        output.assertValues(1,2,3,4,5,6).assertComplete();
    }
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文