Rxjava JVM响应式扩展 Reactive Extensions

发布于 2024-10-01 13:00:57 字数 3449 浏览 12 评论 0

线程切换原理

  • 多次用 subscribeOn 指定上游线程为什么只有第一次有效?
  • 上游事件是怎么跑到子线程里执行的?
  • 子线程是谁如何创建的
  • 异步取消任务原理
final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

可以看出:把订阅任务放到了一个 Runnable 里执行

--

public abstract static class Worker implements Disposable {
  
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

  
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);        
    }


之后会把任务封装为 Worker,通过 Worker 的 schedule 方法开始执行这个 task。

Schedulers.newThread()

在 NewThreadWorker 类里:

public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
            POOLS.put(e, exec);
        }
        return exec;
    }

可以看出:在 NewThreadScheduler 的 createWorker() 方法中,通过其构建好的线程工厂,在 Worker 实现类的构造函数中创建了一个 ScheduledExecutorService 的实例, 核心线程为 1 ,是通过 SchedulerPoolFactory 创建的。

    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = schedulersHook.onSchedule(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }

subscribeOn 问题

因为上游 Observable 只有一个任务,就是 subscribe(准确的来说是 subscribeActual()),而 subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了

异步取消任务

当递归执行 subscription cancel 的时候,总会跑到某个 subscriber 里面,我们称呼它为 A,它的 worker 正在执行,虽然执行代码的片段不一定是 A 的 run 或者是 onNext(有可能是它下游的某个 subscriber) 但是一定会运行在 A 的 worker 里面,所以执行 worker.dipose 停掉这个 worker 就可以取消异步任务(所有的 runnable 都会被封装成为 FutureTask 从而可以被取消)

重要结论:调用 subscribe 时的从下往上是 subscribeOn 切换线程,之后调用 onNext 传递数据时的从上往下是 ObserveOn 切换线程.

AndroidSchedulers.mainThread

参考 Worker 实现类 HandlerWorker 的 schedule()

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    /**忽略一些代码**/
    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    if (disposed) {
          handler.removeCallbacks(scheduled);
          return Disposables.disposed();
    }
    return scheduled;
}

这里传入了主线程的 handler 去执行。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

猫弦

暂无简介

0 文章
0 评论
24 人气
更多

推荐作者

玍銹的英雄夢

文章 0 评论 0

我不会写诗

文章 0 评论 0

十六岁半

文章 0 评论 0

浸婚纱

文章 0 评论 0

qq_kJ6XkX

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文