Append HDFS报错 is already the current lease holder

发布于 2022-09-11 21:10:02 字数 5553 浏览 27 评论 0

使用Kafka消费者写入HDFS报错,逻辑是这样:

  1. 第一次写HDFS会创建一个文件
  2. 第二次以及接下来会接着往这个文件APPEND数据

当中其实有报错两次错误

  1. 第一次报错信息

    Caused by: org.apache.hadoop.ipc.RemoteException: append: lastBlock=blk_1073851558_23293749 of src=/logs/4_CS.JCY/2019/06/03/T_LOG_RECORD_CS/web-Thread-3 is not sufficiently replicated yet.

    提示好像是因为写的备份数的问题,查询了下网上的问题,确实有人遇到,我就改了下这个消费者写HDFS的备份数为1,也就是只会写一份。接下来这个报错真的没有发生了,如果有更好的处理方式请务必告知我!

  2. 接着就是第二次报错了

    Caused by: org.apache.hadoop.ipc.RemoteException: Failed to APPEND_FILE /logs/4_CS.JCY/2019/07/18/T_LOG_RECORD_CS/web-Thread-3 for DFSClient_NONMAPREDUCE_-558981961_18 on IP地址 because DFSClient_NONMAPREDUCE_-558981961_18 is already the current lease holder.
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2970)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2766)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:3073)
            at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:3042)
            at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:760)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:429)
            at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
            at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2313)
            at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2309)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.Subject.doAs(Subject.java:422)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1724)
            at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2307)
            at org.apache.hadoop.ipc.Client.call(Client.java:1475)
            at org.apache.hadoop.ipc.Client.call(Client.java:1412)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy60.append(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.append(ClientNamenodeProtocolTranslatorPB.java:328)
            at sun.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
            at com.sun.proxy.$Proxy61.append(Unknown Source)
            at org.apache.hadoop.hdfs.DFSClient.callAppend(DFSClient.java:1808)
            at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1877)
            at org.apache.hadoop.hdfs.DFSClient.append(DFSClient.java:1847)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:340)
            at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:336)
            at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
            at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:348)
            at org.apache.hadoop.hdfs.DistributedFileSystem.append(DistributedFileSystem.java:318)
            at org.apache.hadoop.fs.FileSystem.append(FileSystem.java:1166)
            at com.sharing.hdfs.HdfsHelper$.getFSDataOutputSteam(HdfsHelper.scala:225)
            at com.sharing.hdfs.HdfsHelper$.writeLines(HdfsHelper.scala:149)

    报错信息应该是租约过期的问题,尝试了网上介绍的更新租约方法,但是每天都会有报错信息,虽然在报错后,一段时间可以恢复,但终究不是很舒服这种处理方式!

相关代码

/**
    * 创建fsDataOutputSteam
    *
    * @param path
    * @return
    */
  def getFSDataOutputSteam(path: String): FSDataOutputStream = {
    val pathIsExist = isExist(path)
    if (pathIsExist) {
      fileSystem.append(path)
    } else {
      fileSystem.create(path)
    }
  }
 /**
    * 写多行
    *
    * @param lines
    * @param path
    */
  def writeLines(lines: java.util.List[String], path: String): Unit = {


    var outputStream: FSDataOutputStream = null
    try {

      outputStream = getFSDataOutputSteam(path)
      lines.foreach({
        line =>
          outputStream.write(line.trim.getBytes("UTF-8"))
          outputStream.write("\n".getBytes("UTF-8"))
      })
      LOGGER.info("HDFS写入成功!")
    }
    catch {
      case e: IOException =>
        LOGGER.error(e.getMessage, e)
        updateLease(path)
        throw new IOException(e)
    } finally {
      if (Objects.nonNull(outputStream)) {
        outputStream.close()
      }
    }
  }
// 更新租约,不懂有没有用
  def updateLease(path: String): Unit = {

    LOGGER.error("开始更新租约")
    fileSystem.close()
    fileSystem = FileSystem.get(uri, conf, userName).asInstanceOf[DistributedFileSystem]
    fileSystem.recoverLease(path)
    TimeUnit.SECONDS.sleep(3)
  }

你期待的结果是什么?

希望有比较正规有效的写HDFS方法,目前感觉使用HDFS APPEND API总是会报错!目前碰到上面这两个问题。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

影子是时光的心 2022-09-18 21:10:02

老哥,有好的办法了么,我跟你的场景差不多,kafka读取写入hdfs,用的是springboot初始化了个fileSystem 的bean,然后跑着跑着就报错了,already the current lease holder.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文