关于hbase多线程写入问题

发布于 2021-11-30 15:01:57 字数 596 浏览 794 评论 7

@cloud-coder 你好,想跟你请教个问题:

多线程写入hbase,官方已经摒弃了HtablePool方式写入,推荐使用HConnection#getTable方式。

问题:我不想put一次,就flushCommit。于是我在client端使用了setFlusthAo(false)以及缓存的大小XMB。

这种客户端只能达到table达到XMB的时候才能自动flushCommit。但是我们的业务只允许几秒延迟的,于是乎我使用了每隔三秒flushCommit一次(在getTable的时候,启动线程对这个table每隔3s flushCommit一次)。

发现Htable线程不是安全的,在我flushCommit的同时,会有其他线程对这个table进行写入。然后报异常

**ModificationException (具体名字忘记了)


请问对于这种问题,你有没有碰到过?然后是怎么处理的,谢谢!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(7

爱你是孤单的心事 2021-12-01 10:11:12

当然是以我自身的业务来说

羁拥 2021-12-01 10:10:55

hbase region的heap设置大点,如12G

使用的是什么版本的hbase,CDH5?

叹沉浮 2021-12-01 10:10:05

@cloud-coder  感谢您的答复。

我这边还有几个问题:

1、压缩完后(数据量达到1.6T),读的速度立马上升,但是写入速度速度下降,运行一段时间,会出现rpc连接一直断开现象。目前无法下手怎么优化,请问下你们对这方面怎么优化方法?

(我们有5台regionserver,压缩完后每台有70个region,每region 就1~2 个Store, 最大StoreFile是10G,只有一个簇)。

回眸一笑 2021-12-01 09:52:38

你的问题我没有遇到过,

夜司空 2021-12-01 08:28:12

可以在方法中通过调用table.flushcommits进行提交,一般情况下一个事务提交一次

可以是一行或多行数据

彼岸花ソ最美的依靠 2021-12-01 06:58:42

我在评论中回复了,请帮忙看下。谢谢:)

岁月打碎记忆 2021-11-30 22:13:43
// 初始化UserProvider
        UserProvider userProvider = UserProvider.instantiate(conf);

        // 初始化admin,ConnectionCache
        try {
            connectionCache = new ConnectionCache(conf, userProvider, cleanInterval, maxIdleTime);
            admin = new HBaseAdmin(conf);
        }
        catch (Exception e) {
            LOG.error("创建HBaseAdmin失败:" + e.getMessage());
        }

        // 初始化HTableFactory
        tableFactory = new HTableFactory() {
            @Override
            public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
                try {
                    return connectionCache.getTable(Bytes.toString(tableName));
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
        };

        // 初始化Cache
        htablePools =
                CacheBuilder.newBuilder().expireAfterAccess(maxIdleTime, TimeUnit.MILLISECONDS).softValues()
                    .concurrencyLevel(4).build();
        htablePoolCreater = new Callable<HTablePool>() {
            public HTablePool call() {
                return new HTablePool(conf, maxPoolSize, tableFactory);
            }
        };


/**
     * 获取HTableInterface实例
     * 
     * @param tableName
     * @return
     */
    private static HTableInterface getTable(String tableName) {
        String currentUser = connectionCache.getEffectiveUser();
        try {
            HTablePool htablePool = htablePools.get(currentUser, htablePoolCreater);
            return htablePool.getTable(tableName);
        }
        catch (ExecutionException ee) {
            throw new RuntimeException(ee);
        }
    }

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文