Spring 系列
- IoC 容器
- AOP
- SpringMVC
- Spring 事务
- Spring 源码故事(瞎编版)
- Spring 整体脉络
- Spring 类解析
- Spring 自定义标签解析
- Spring Scan 包扫描
- Spring 注解工具类
- Spring 别名注册
- Spring 标签解析类
- Spring ApplicationListener
- Spring messageSource
- Spring 自定义属性解析器
- Spring 排序工具
- Spring-import 注解
- Spring-定时任务
- Spring StopWatch
- Spring 元数据
- Spring 条件接口
- Spring MultiValueMap
- Spring MethodOverride
- Spring BeanDefinitionReaderUtils
- Spring PropertyPlaceholderHelper
- Spring-AnnotationFormatterFactory
- Spring-Formatter
- Spring-Parser
- Spring-Printer
- Spring5 新特性
- Spring RMI
- Spring Message
- SpringBoot
- SpringBootBatch
- Spring Cloud
- SpringSecurity
MyBatis
- 基础支持层
- 核心处理层
- 类解析
Netty
- 网络 IO 技术基础
- JDK1.8 NIO 包 核心组件源码剖析
- Netty 粘拆包及解决方案
- Netty 多协议开发
- 基于 Netty 开发服务端及客户端
- Netty 主要组件的源码分析
- Netty 高级特性
- Netty 技术细节源码分析
Dubbo
- 架构设计
- SPI 机制
- 注册中心
- 远程通信
- RPC
- 集群
Tomcat
- Servlet 与 Servlet 容器
- Web 容器
Redis
Nacos
Sentinel
RocketMQ
- RocketMQ NameServer 与 Broker 的通信
- RocketMQ 生产者启动流程
- RocketMQ 消息发送流程
- RocketMQ 消息发送存储流程
- RocketMQ MappedFile 内存映射文件详解
- RocketMQ ConsumeQueue 详解
- RocketMQ CommitLog 详解
- RocketMQ IndexFile 详解
- RocketMQ 消费者启动流程
- RocketMQ 消息拉取流程
- RocketMQ Broker 处理拉取消息请求流程
- RocketMQ 消息消费流程
番外篇(JDK 1.8)
- 基础类库
- 集合
- 并发编程
学习心得
RocketMQ MappedFile 内存映射文件详解
该文所涉及的 RocketMQ 源码版本为 4.9.3。
RocketMQ MappedFile 内存映射文件详解
1、MappedFile 初始化
private void init(final String fileName, final int fileSize) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try {
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
} catch (FileNotFoundException e) {
log.error("Failed to create file " + this.fileName, e);
throw e;
} catch (IOException e) {
log.error("Failed to map file " + this.fileName, e);
throw e;
} finally {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
}
}
初始化fileFromOffset
,因为 commitLog 文件夹下的文件都是以偏移量为命名的,所以转成了 long 类型
确认文件目录是否存在,不存在则创建
public static void ensureDirOK(final String dirName) {
if (dirName != null) {
if (dirName.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
String[] dirs = dirName.trim().split(MessageStoreConfig.MULTI_PATH_SPLITTER);
for (String dir : dirs) {
createDirIfNotExist(dir);
}
} else {
createDirIfNotExist(dirName);
}
}
}
通过RandomAccessFile
设置 fileChannel
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
使用 NIO 内存映射将文件映射到内存中
this.mappedByteBuffer = this.fileChannel.map(MapMode.*READ_WRITE*, 0, fileSize);
2、MappedFile 提交
public int commit(final int commitLeastPages) {
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
if (this.isAbleToCommit(commitLeastPages)) {
if (this.hold()) {
commit0();
this.release();
} else {
log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
}
}
// All dirty data has been committed to FileChannel.
if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
this.transientStorePool.returnBuffer(writeBuffer);
this.writeBuffer = null;
}
return this.committedPosition.get();
}
如果 wroteBuffer 为空,直接返回 wrotePosition
if (writeBuffer == null) {
//no need to commit data to file channel, so just regard wrotePosition as committedPosition.
return this.wrotePosition.get();
}
判断是否执行 commit 操作:
如果文件已满,返回 true
if (this.isFull()) {
return true;
}
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
commitLeastPages 为本次提交的最小页数,如果 commitLeastPages 大于 0,计算当前写指针(wrotePosition
)与上一次提交的指针committedPosition
的差值 除以页OS_PAGE_SIZE
的大小得到脏页数量,如果大于 commitLeastPages,就可以提交。如果 commitLeastPages 小于 0,则存在脏页就提交
if (commitLeastPages > 0) {
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= commitLeastPages;
}
return write > flush;
MapperFile 具体的提交过程,首先创建 writeBuffer
的共享缓存区,设置 position 为上一次提交的位置committedPosition
,设置 limit 为wrotePosition
当前写指针,接着将 committedPosition 到 wrotePosition 的数据写入到 FileChannel 中,最后更新 committedPosition 指针为 wrotePosition
protected void commit0() {
int writePos = this.wrotePosition.get();
int lastCommittedPosition = this.committedPosition.get();
if (writePos - lastCommittedPosition > 0) {
try {
ByteBuffer byteBuffer = writeBuffer.slice();
byteBuffer.position(lastCommittedPosition);
byteBuffer.limit(writePos);
this.fileChannel.position(lastCommittedPosition);
this.fileChannel.write(byteBuffer);
this.committedPosition.set(writePos);
} catch (Throwable e) {
log.error("Error occurred when commit data to FileChannel.", e);
}
}
}
3、MappedFile 刷盘
判断是否要进行刷盘
文件是否已满
if (this.isFull()) {
return true;
}
public boolean isFull() {
return this.fileSize == this.wrotePosition.get();
}
如果flushLeastPages
大于 0,判断写数据指针位置-上次刷盘的指针位置, 然后除以OS_PAGE_SIZE 是否大于等于
flushLeastPages
如果 flushLeastPages 小于等于 0,判断是否有要刷盘的数据
if (flushLeastPages > 0) {
return ((write /OS_PAGE_SIZE) - (flush /OS_PAGE_SIZE)) >= flushLeastPages;
}
return write > flush;
获取最大读指针
public int getReadPosition() {
return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
}
将数据刷出到磁盘
如果writeBuffer
不为空或者通道的 position 不等于 0,通过 fileChannel 将数据刷新到磁盘
否则通过 MappedByteBuffer 将数据刷新到磁盘
4、MappedFile 销毁
public boolean destroy(final long intervalForcibly) {
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
+ this.getFlushedPosition() + ", "
+ UtilAll.computeElapsedTimeMilliseconds(beginTime));
} catch (Exception e) {
log.warn("close file channel " + this.fileName + " Failed. ", e);
}
return true;
} else {
log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
+ " Failed. cleanupOver: " + this.cleanupOver);
}
return false;
}
1> 关闭 MappedFile
第一次调用时 this.available为true
,设置 available 为 false,设置第一次关闭的时间戳为当前时间戳,调用 release()释放资源,只有在引用次数小于 1 的时候才会释放资源,如果引用次数大于 0,判断当前时间与 firstShutdownTimestamp 的差值是否大于最大拒绝存活期intervalForcibly
,如果大于等于最大拒绝存活期,将引用数减少 1000,直到引用数小于 0 释放资源
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
2> 判断是否清理完成
是否清理完成的标准是引用次数小于等于 0 并且清理完成标记 cleanupOver 为 true
public boolean isCleanupOver() {
return this.refCount.get() <= 0 && this.cleanupOver;
}
3> 关闭文件通道 fileChannel
this.fileChannel.close();
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论