如何从大数据源中排除重复记录?
我已经开始处理以 JSON 格式到达的大型数据集。不幸的是,提供数据馈送的服务提供了大量的重复记录。从好的方面来说,每条记录都有一个唯一的 ID 号,存储为 64 位正整数 (Java long)。
数据每周到达一次,每次传送约 10M 条记录。我需要排除当前交付中的重复项以及先前批次中的记录。
解决重复数据删除问题的强力方法是将 ID 号推送到 Java Set 中。由于Set接口要求唯一性,因此插入期间失败将指示重复。
问题是:在导入记录时是否有更好的方法来查找重复的long?
我正在使用 Hadoop 来挖掘数据,因此如果有一个好的方法可以使用 Hadoop 来重复记录删除,那就太好了。
I have started working with a large dataset that is arriving in JSON format. Unfortunately, the service providing the data feed delivers a non-trivial number of duplicate records. On the up-side, each record has a unique Id number stored as a 64 bit positive integer (Java long).
The data arrives once a week and is about 10M records in each delivery. I need to exclude duplicates from within the current delivery as well as records that were in previous batches.
The brute force approach to attacking the de-dup problem is push the Id number into a Java Set. Since the Set interface requires uniqueness, a failure during the insert will indicate a duplicate.
The question is: Is there a better way to look for a duplicate long as I import records?
I am using Hadoop to mine the data, so if there is a good way to use Hadoop to de-dup the records that would be a bonus.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
您能否创建一个 MapReduce 任务,其中映射输出具有唯一 ID 号的键?这样,在您的reduce 任务中,您将收到具有该ID 号的所有值的迭代器。仅输出第一个值,并且减少的输出将不会出现重复项。
Could you create a MapReduce task where the map output has a key of the unique ID number? That way, in your reduce task, you will be passed an iterator of all the values with that ID number. Output only the first value and your reduced output will be free of duplicates.
让我看看。每个
java.lang.Long
占用 24 个字节。每个HashMap$Entry
也占用 24 个字节,HashMap
的数组占用 4 个字节。所以你有 52 * 10M = 512M 的堆存储空间用于映射。不过,这是一周内 1000 万条记录的情况。如果您使用的是 64 位系统,则只需将堆大小设置为 5 GB,然后看看能达到什么程度。
应该还有
java.util.Set
的其他实现,每个条目仅消耗大约 16 个字节,因此您可以处理三倍于java.util.HashSet
的数据>。我自己写了一篇,但不能分享。您可以尝试使用 GNU Trove。Let me see. Each
java.lang.Long
takes 24 bytes. EachHashMap$Entry
takes 24 bytes as well, and the array for theHashMap
takes 4 bytes. So you have 52 * 10M = 512M of heap storage for the map. This is for the 10M records of one week, though.If you are on a 64-bit system, you can just set the heap size to 5 GB and see how far you get.
There should be other implementations of a
java.util.Set
that only consume about 16 bytes per entry, so you can handle three times the data as with ajava.util.HashSet
. I've written one myself, but I cannot share it. You may try GNU Trove instead.您必须在 HDFS 中保留唯一 ID 列表,并在每次批量加载后重建它。
由于您的情况的基数非常大(您可以预期一年内有超过 1B 条唯一记录),因此您的唯一 id 列表需要分为多个部分,例如 N。分区算法是特定于域的。一般的做法是将ID转换为长哈希字符串(16字节即可)并创建2^k个桶:
对于k = 8,例如:
桶#1包含哈希值以0开头的所有ID
#2桶包含哈希值以1开头的所有ID
...
存储桶 #256 包含哈希值以 255 开头的所有 ID
在您收到的每个新批次中,首先运行重复数据删除作业: Map 读取记录,获取记录 ID,对其进行哈希处理并输出 Key=bucket#(在我们的例子中为 0..255)和 Value = 身份证。每个reducer都会接收给定bucket的所有IDS。 Reducer 将系统中已知的给定存储桶的所有唯一 ID 加载到内部 Set 中,并使用此内部 Set 检查所有传入记录 ID。如果记录的 ID 未知,则更新内部集并输出记录。
在减速器关闭时,您将内部唯一 ID 集输出回 HDFS。
通过将整组 ID 拆分为多个存储桶,您可以创建可扩展的解决方案。
You have to keep list of unique ids in HDFS and rebuild it after every batch load.
As since the cardinality in your case is quite large (you can expect > 1B unique records in one year) your unique id list needs to be split into multiple parts, say N. The partition algorithm is domain specific. The general approach is to convert ID into long hash string (16 bytes is OK) and creates 2^k buckets:
For k =8, for example:
bucket #1 contains all IDs whose hash value starts with 0
bucket #2 contains all IDs whose hash value starts with 1
...
bucket #256 contains all IDs whose hash value starts with 255
On every new batch you receive run dedupe job first: Map reads records , takes record ID, hashes it and outputs Key=bucket# (0..255 in our case) and Value = ID. Each reducer receives all IDS for a given bucket. Reducer loads ALL unique IDs for a given bucket known in your system already into internal Set and checks ALL incoming record IDs with this this internal Set. If record has ID which s not known yet you update internal Set and output the record.
On reducer close you output internal Set of unique IDs back to HDFS.
By splitting the whole set of IDs into number of buckets you create solution which scales well.