rxbus连续发送消息报错

发布于 2022-09-04 02:11:19 字数 6722 浏览 17 评论 0

我用rxbus连续发送大量消息报错

08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main

                                                           Process: com.dituwuyou, PID: 10637
                                                           java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
                                                               at android.os.Handler.handleCallback(Handler.java:739)
                                                               at android.os.Handler.dispatchMessage(Handler.java:95)
                                                               at android.os.Looper.loop(Looper.java:148)
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417)
                                                               at java.lang.reflect.Method.invoke(Native Method)
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
                                                            Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
                                                               at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
                                                               at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
                                                               at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
                                                               at android.os.Handler.handleCallback(Handler.java:739) 
                                                               at android.os.Handler.dispatchMessage(Handler.java:95) 
                                                               at android.os.Looper.loop(Looper.java:148) 
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417) 
                                                               at java.lang.reflect.Method.invoke(Native Method) 
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) 
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) 
                                                            Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
                                                               at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
                                                               at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
                                                               at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
                                                               at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
                                                               at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)
                                                               at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)
                                                               at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)
                                                               at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)
                                                               at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)
                                                               at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)
                                                               at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)
                                                               at java.lang.Thread.run(Thread.java:818)

我的rxbus是这样定义的

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
 * Created by xg on 2016/6/24.
 * 消息传递(替换handler,eventbus)
 */
public class RxBus {
    private static volatile RxBus mInstance;
    private final Subject bus;

    public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }

    /**
     * 单例模式RxBus
     *
     * @return
     */
    public static RxBus getRxBusSingleton() {
        RxBus rxBus2 = mInstance;
        if (mInstance == null) {
            synchronized (RxBus.class) {
                rxBus2 = mInstance;
                if (mInstance == null) {
                    rxBus2 = new RxBus();
                    mInstance = rxBus2;
                }
            }
        }
        return rxBus2;
    }

    /**
     * 发送消息
     *
     * @param object
     */
    public void send(Object object) {
        bus.onNext(object);
    }

    /**
     * 接收消息
     *
     * @return
     */
    public Observable toObserverable() {
        return bus;
    }
}

这是第45行
bus.onNext(object);
应该是出现背压了

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

等风来 2022-09-11 02:11:19

1.看不懂你为什么要加个rxBus2
2.要处理onError的情况
3.先注册 后send

潇烟暮雨 2022-09-11 02:11:19

用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文