Append HDFS报错 is already the current lease holder
使用Kafka消费者写入HDFS报错,逻辑是这样:
- 第一次写HDFS会创建一个文件
- 第二次以及接下来会接着往这个文件APPEND数据
当中其实有报错两次错误
第一次报错信息
Caused by: org.apache.hadoop.ipc.RemoteException: append: lastBlock=blk_1073851558_23293749 of src=/logs/4_CS.JCY/2019/06/03/T_LOG_RECORD_CS/web-Thread-3 is not sufficiently replicated yet.
提示好像是因为写的备份数的问题,查询了下网上的问题,确实有人遇到,我就改了下这个消费者写HDFS的备份数为1,也就是只会写一份。接下来这个报错真的没有发生了,如果有更好的处理方式请务必告知我!
接着就是第二次报错了
Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE /logs/4_CS.JCY/2019/07/18/T_LOG_RECORD_CS/web-Thread-3 for DFSClient_NONMAPREDUCE_-558981961_18 on IP地址 because DFSClient_NONMAPREDUCE_-558981961_18 is already the current lease holder. at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2970) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2766) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3073) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3042) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:760) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:429) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307) at org.apache.hadoop.ipc.Client.call(Client.java:1475) at org.apache.hadoop.ipc.Client.call(Client.java:1412) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229) at com.sun.proxy.$Proxy60.append(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328) at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy61.append(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877) at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340) at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348) at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318) at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166) at com.sharing.hdfs.HdfsHelper$.getFSDataOutputSteam(HdfsHelper.scala:225) at com.sharing.hdfs.HdfsHelper$.writeLines(HdfsHelper.scala:149)
报错信息应该是租约过期的问题,尝试了网上介绍的更新租约方法,但是每天都会有报错信息,虽然在报错后,一段时间可以恢复,但终究不是很舒服这种处理方式!
相关代码
/**
* 创建fsDataOutputSteam
*
* @param path
* @return
*/
def getFSDataOutputSteam(path: String): FSDataOutputStream = {
val pathIsExist = isExist(path)
if (pathIsExist) {
fileSystem.append(path)
} else {
fileSystem.create(path)
}
}
/**
* 写多行
*
* @param lines
* @param path
*/
def writeLines(lines: java.util.List[String], path: String): Unit = {
var outputStream: FSDataOutputStream = null
try {
outputStream = getFSDataOutputSteam(path)
lines.foreach({
line =>
outputStream.write(line.trim.getBytes("UTF-8"))
outputStream.write("\n".getBytes("UTF-8"))
})
LOGGER.info("HDFS写入成功!")
}
catch {
case e: IOException =>
LOGGER.error(e.getMessage, e)
updateLease(path)
throw new IOException(e)
} finally {
if (Objects.nonNull(outputStream)) {
outputStream.close()
}
}
}
// 更新租约,不懂有没有用
def updateLease(path: String): Unit = {
LOGGER.error("开始更新租约")
fileSystem.close()
fileSystem = FileSystem.get(uri, conf, userName).asInstanceOf[DistributedFileSystem]
fileSystem.recoverLease(path)
TimeUnit.SECONDS.sleep(3)
}
你期待的结果是什么?
希望有比较正规有效的写HDFS方法,目前感觉使用HDFS APPEND API总是会报错!目前碰到上面这两个问题。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
老哥,有好的办法了么,我跟你的场景差不多,kafka读取写入hdfs,用的是springboot初始化了个fileSystem 的bean,然后跑着跑着就报错了,already the current lease holder.