获取kafka的数据,处理完之后的一些重复数据问题,使用外部介质redis
需求:kafka消费去重
环境:dubbo分布式部署,redis集群,kafka队列
模拟数据:10条相同数据
思路:多线程从kafka获取数据处理,从redis获取标识A,如果存放的标识A不存在,那么处理一次,如果存放的标识A存在,那么取出值(json串)中的时间和系统时间对比,如果(当前时间-redis存放的时间)<=10S,只处理一次,后面9次丢弃,反之全部处理
图形:
if(当前系统时间-redis存放时间<=10S){
//只处理一次,10条数据只处理一次,其他9条数据丢弃
}else{
//全部处理
}
问题:分布式节点部署,而且还是多线程从kafka获取数据,同时在执行,经过测试,同时在redis拿数据,比如redis存放的处理时间是2017-08-17 15:00:00,其他线程也是拿到的是这个时间,造成重复消费...
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(8)
那你就直接对得到的消息进行唯一码生成然后借助redis的setnx进行去重处理,过期的设置只是为了防止redis里面数据太多
引用来自“CoCo丶Hu”的评论
1、kafka获取的消息的字符串进行MD5
2、用这个MD5作为key对redis进行setnx,并设置key的过期时间
引用来自“CoCo丶Hu”的评论
1、kafka获取的消息的字符串进行MD5
2、用这个MD5作为key对redis进行setnx,并设置key的过期时间
key主要是为了保证消息的唯一性,就是说如果是相同的消息,你要算出来一个同样的key。而这个唯一性的key值算出后是不是接下来处理,用redis的 setnx进行处理
这个key 可以是 (topic + payload)的MD5
引用来自“CoCo丶Hu”的评论
1、kafka获取的消息的字符串进行MD5
2、用这个MD5作为key对redis进行setnx,并设置key的过期时间
1、kafka获取的消息的字符串进行MD5
2、用这个MD5作为key对redis进行setnx,并设置key的过期时间
请各位大大帮帮我,我公司不能上传截图...扎心了...