Rxjava JVM响应式扩展 Reactive Extensions
线程切换原理
- 多次用 subscribeOn 指定上游线程为什么只有第一次有效?
- 上游事件是怎么跑到子线程里执行的?
- 子线程是谁如何创建的
- 异步取消任务原理
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
可以看出:把订阅任务放到了一个 Runnable 里执行
--
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
之后会把任务封装为 Worker,通过 Worker 的 schedule 方法开始执行这个 task。
Schedulers.newThread()
在 NewThreadWorker 类里:
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
可以看出:在 NewThreadScheduler 的 createWorker() 方法中,通过其构建好的线程工厂,在 Worker 实现类的构造函数中创建了一个 ScheduledExecutorService 的实例, 核心线程为 1 ,是通过 SchedulerPoolFactory 创建的。
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
subscribeOn 问题
因为上游 Observable 只有一个任务,就是 subscribe(准确的来说是 subscribeActual()),而 subscribeOn 要做的事情就是把上游任务切换到一个指定线程里,那么一旦被切换到了某个指定的线程里,后面的切换不就是没有意义了
异步取消任务
当递归执行 subscription cancel 的时候,总会跑到某个 subscriber 里面,我们称呼它为 A,它的 worker 正在执行,虽然执行代码的片段不一定是 A 的 run 或者是 onNext(有可能是它下游的某个 subscriber) 但是一定会运行在 A 的 worker 里面,所以执行 worker.dipose 停掉这个 worker 就可以取消异步任务(所有的 runnable 都会被封装成为 FutureTask 从而可以被取消)
重要结论:调用 subscribe 时的从下往上是 subscribeOn 切换线程,之后调用 onNext 传递数据时的从上往下是 ObserveOn 切换线程.
AndroidSchedulers.mainThread
参考 Worker 实现类 HandlerWorker 的 schedule()
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
/**忽略一些代码**/
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
这里传入了主线程的 handler 去执行。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: TCP 和 UDP 特点和区别
下一篇: MyBatis 介绍和使用
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论