获取kafka的数据,处理完之后的一些重复数据问题,使用外部介质redis

发布于 2021-12-03 13:00:41 字数 485 浏览 834 评论 8

需求:kafka消费去重

环境:dubbo分布式部署,redis集群,kafka队列

模拟数据:10条相同数据

思路:多线程从kafka获取数据处理,从redis获取标识A,如果存放的标识A不存在,那么处理一次,如果存放的标识A存在,那么取出值(json串)中的时间和系统时间对比,如果(当前时间-redis存放的时间)<=10S,只处理一次,后面9次丢弃,反之全部处理

图形:

if(当前系统时间-redis存放时间<=10S){

   //只处理一次,10条数据只处理一次,其他9条数据丢弃

}else{

  //全部处理

}

问题:分布式节点部署,而且还是多线程从kafka获取数据,同时在执行,经过测试,同时在redis拿数据,比如redis存放的处理时间是2017-08-17 15:00:00,其他线程也是拿到的是这个时间,造成重复消费...

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(8

情痴 2021-12-05 06:13:05

那你就直接对得到的消息进行唯一码生成然后借助redis的setnx进行去重处理,过期的设置只是为了防止redis里面数据太多

陌上芳菲 2021-12-05 06:11:53

引用来自“CoCo丶Hu”的评论

1、kafka获取的消息的字符串进行MD5

2、用这个MD5作为key对redis进行setnx,并设置key的过期时间

瑾夏年华 2021-12-05 06:10:41

引用来自“CoCo丶Hu”的评论

1、kafka获取的消息的字符串进行MD5

2、用这个MD5作为key对redis进行setnx,并设置key的过期时间

彩扇题诗 2021-12-05 05:59:22

key主要是为了保证消息的唯一性,就是说如果是相同的消息,你要算出来一个同样的key。而这个唯一性的key值算出后是不是接下来处理,用redis的 setnx进行处理

醉酒的小男人 2021-12-05 05:54:45

这个key 可以是 (topic + payload)的MD5

樱花落人离去 2021-12-05 05:23:51

引用来自“CoCo丶Hu”的评论

1、kafka获取的消息的字符串进行MD5

2、用这个MD5作为key对redis进行setnx,并设置key的过期时间

岁吢 2021-12-04 23:29:17

1、kafka获取的消息的字符串进行MD5

2、用这个MD5作为key对redis进行setnx,并设置key的过期时间

躲猫猫 2021-12-04 05:27:19

请各位大大帮帮我,我公司不能上传截图...扎心了...

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文