安卓开发 之 线程池
记住构造方法:
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心线程大小
- maximumPoolSize:最大线程数量,= 核心线程 + 空闲线程
- keepAliveTime、unit: 空闲线程能活多久
- workQueue:内部容器类型
- threadFactory:自己实现线程工厂
- handler:自己实现拒绝策略
- 内部容器 BlockingQueue:具体实现:LinkedBlockingQueue(无界队列)、 ArrayBlockingQueue(有界队列)等
- 每一个任务都封装成了 Worker 并继承 AQS 抽象类
- 什么是 AQS?
- 拒绝策略:
- DiscardPolicy,新任务直接丢弃
- DiscardOldestPolicy, 丢弃最早入队的任务
- AbortPolicy,throw 报错
- CallerRunsPolicy,让当前线程执行该任务
- 特点:
- 在线程池创建初期,里面的线程不会立即启动,而是等到有任务提交时才启动,除非调用 prestartAllCoreThreads()
- 使用 SynchronousQueue 时,一般把线程池的 corePoolSize 设为 0,是为了让任务完成后工作线程得以销毁并让 JVM 顺利退出。不然会发生:当线程池的线程数量 等于 corePoolSize 时,只有当工作队列填满后才会新建线程的情况。
不用拒绝策略,自定义
public class BoundedExecutor {
private Executor executor;
private Semaphore semaphore;
public BoundedExecutor(Executor executor, int bound){
this.executor = executor;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command){
try {
semaphore.acquire();
executor.execute(new Runnable() {
@Override
public void run() {
try{
command.run();
}finally {
semaphore.release();
}
}
});
} catch (InterruptedException e) {
e.printStackTrace();
semaphore.release();
}
}
}
- 使用 Semaphore 信号量来起到阻塞的作用
- 必须注意 semaphore.release() 必须执行无论最后成功与否,和数据库游标、流 close 一样
添加统计信息
- beforeExecute:执行前回调
- afterExecute:执行后回调
- terminated:所有任务完成并且 Workder 线程关闭之后回调
- 使用 ThreadLocal 读写
- AtomicLong 保证原子操作
public class TimingThreadPool extends ThreadPoolExecutor {
private ThreadLocal<Long> startTime = new ThreadLocal<>();
private Logger log = Logger.getLogger("TimingThreadPool");
private AtomicLong numTasks = new AtomicLong();
private AtomicLong totalTime = new AtomicLong();
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r));
startTime.set(System.nanoTime());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: start %s, time =%dns", t, r, taskTime));
}finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
}finally {
super.terminated();
}
}
}
线程饥饿死锁
意思是在线程池中,所有正在执行任务的线程都因等待工作队列的任务完成而阻塞的情况。
联想 Rxjava 的背压
背压是指在__异步场景__中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略,简而言之,背压是流速控制的一种策略。
- 背压策略的一个前提是异步环境,也就是说,被观察者和观察者处在不同的线程环境中。
- 背压并不是一个像 flatMap 一样可以在程序中直接使用的操作符,他只是一种控制事件流速的策略。
策略
- sample 操作符,丢弃
- buffer 操作符,把多余事件放到缓存池慢慢处理
- onBackpressurebuffer 操作符:把 observable 发送出来的事件做缓存,当 request 方法被调用的时候,给下层流发送一个 item(如果给这个缓存区设置了大小,那么超过了这个大小就会抛出异常)。
- onBackpressureDrop 操作符:将 observable 发送的事件抛弃掉,直到 subscriber 再次调用 request(n)方法的时候,就发送给它这之后的 n 个事件。
- Rxjava2 是怎么处理的
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: Android 安卓 UI 渲染优化
下一篇: MyBatis 介绍和使用
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论