pyspark下foreachPartition()向hbase中写数据,数据没有完全写入hbase中

发布于 2022-09-12 22:39:30 字数 1900 浏览 21 评论 0

1.问题描述

在使用pyspark过程中,遇到了一个向hbase中写数据的问题,在foreachPartition()方法中使用happybase对每个partition中的数据进行写入hbase的时候会出现数据丢失的问题,在hbase中并未完全的写入所有的数据,只写入了一小部分。

2.具体的业务代码如下:

articleVector是文章的向量,similar是文章之间的相似度

article_vector表结构如下:

create temporary table article.article_vector
(
    id            string comment 'id',
    major_id      int comment 'major_id',
    vector array<string> comment 'keyword vector'
);

计算相似的代码:

from pyspark.ml.feature import Word2Vec
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import Word2VecModel
from pyspark.ml.feature import BucketedRandomProjectionLSH

articleVector = spark.sql("select * from article_vector")
def toVector(row):
    return row.id, Vectors.dense(row.vector)
    
train = articleVector.rdd.map(toVector).toDF(["id", "vector"])
brp = BucketedRandomProjectionLSH(inputCol='vector', outputCol='hashes', seed=12345, bucketLength=1.0)
model = brp.fit(train)
similar = model.approxSimilarityJoin(train, train, 2.0, distCol='EuclideanDistance')

存储进hbase中

import happybase

def save_hbase(partitions):
    pool = happybase.ConnectionPool(size=10, host='hbase-url')
    
    with pool.connection() as conn:
        article_similar = conn.table('article_similar')
        for row in partitions:
            article_similar.put(str(row.datasetA.id).encode(),
                                {'similar:{}'.format(row.datasetB.id).encode(): b'%0.4f' % (row.EuclideanDistance)})
        conn.close()
        
similar.foreachPartition(save_hbase)
3.具体问题

article_vector中的数据量为120w条数据,取出来计算完相似度之后得到similar。但是到了save_hbase()这一步就出现问题了,程序跑的过程中并无报错,spark日志中也没有发现异常,但是最终的hbase中article_similar表中却只有6万条记录数。按理说hbase中存储的记录数应该和article_vector中的数据量一致,可以在hbase中根据每一个id查到这个id的对应的相似数据。实际上只存了6万条左右的id,只能查到六万个id对应的相似信息,为什么会这样,是happybase的存储过程中出了什么问题吗?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

回眸一遍 2022-09-19 22:39:30

与happybase无关,LSH的桶长度设置过小,增大BucketedRandomProjectionLSH中的bucketLength,再增大approxSimilarityJoin中的欧氏距离的阈值。详细的可以查看pyspark.ml.feature中的BucketedRandomProjectionLSH类源码。

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文