- Socket 编程发展
- OpenResty 简介
- Lua 入门
- Nginx
- 子查询
- 不同阶段共享变量
- 防止 SQL 注入
- 如何发起新 HTTP 请求
- 访问有授权验证的 Redis
- select+set_keepalive 组合操作引起的数据读写错误
- redis 接口的二次封装(简化建连、拆连等细节)
- redis 接口的二次封装(发布订阅)
- pipeline 压缩请求数量
- script 压缩复杂请求
- 动态生成的 lua-resty-redis 模块方法
- LuaCjsonLibrary
- json解析的异常捕获
- 稀疏数组
- 空table编码为array还是object
- PostgresNginxModule
- 调用方式简介
- 不支持事务
- 超时
- 健康监测
- SQL注入
- LuaNginxModule
- 执行阶段概念
- 正确的记录日志
- 热装载代码
- 阻塞操作
- 缓存
- sleep
- 定时任务
- 禁止某些终端访问
- 请求返回后继续执行
- 调试
- 请求中断后的处理
- 我的 lua 代码需要调优么
- 变量的共享范围
- 动态限速
- shared.dict 非队列性质
- 正确使用长链接
- 如何引用第三方 resty 库
- 典型应用场景
- 怎样理解 cosocket
- 如何安全启动唯一实例的 timer
- 如何正确的解析域名
- LuaRestyDNSLibrary
- 使用动态 DNS 来完成 HTTP 请求
- LuaRestyLock
- 缓存失效风暴
- HTTPS 时代
- 动态加载证书和 OCSP stapling
- TLS session resumption
- 测试
- Web 服务
- 火焰图
- 如何定位问题
- module 是邪恶的
- FFI
- 什么是 JIT
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
redis 接口的二次封装(发布订阅)
其实这一小节完全可以放到上一个小节,只是这里用了完全不同的玩法,所以我还是决定单拿出来分享一下这方面的小细节。
上一小节有关订阅部分的代码,请看:
function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
res, err = redis:read_reply()
if not res then
return nil, err
end
redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return res, err
end
其实这里的实现是有问题的,各位看官,你能发现这段代码的问题么?给个提示,在高并发订阅场景下,极有可能存在漏掉部分订阅信息。原因在于每次订阅到内容后,都会把 Redis 对象进行释放,处理完订阅信息后再次去连接 Redis,在这个时间差里面,很可能有消息已经漏掉了。
通过下面的代码可以解决这个问题:
function _M.subscribe( self, channel )
local redis, err = redis_c:new()
if not redis then
return nil, err
end
local ok, err = self:connect_mod(redis)
if not ok or err then
return nil, err
end
local res, err = redis:subscribe(channel)
if not res then
return nil, err
end
local function do_read_func ( do_read )
if do_read == nil or do_read == true then
res, err = redis:read_reply()
if not res then
return nil, err
end
return res
end
redis:unsubscribe(channel)
self.set_keepalive_mod(redis)
return
end
return do_read_func
end
调用示例代码:
local red = redis:new({timeout=1000})
local func = red:subscribe( "channel" )
if not func then
return nil
end
while true do
local res, err = func()
if err then
func(false)
end
... ...
end
return cbfunc
另一个潜在的问题是,调用了 unsubscribe
之后,Redis 对象里面有可能还遗留没被读取的数据。在这种情况下,无法直接通过 set_keepalive_mod
复用连接。什么时候会发生这样的情况呢?
当 Redis 对象处于 subscribe 状态时,Redis 会给它推送订阅的消息,然后我们通过 read_reply
把消息读出来。调用 unsubscribe
的时候,只是退订了对应的频道,并不会把当前接收到的数据清空。如果要想复用该连接,我们就需要保证清空当前读取到的数据,保证它是干净的。就像这样:
local res, err = red:unsubscribe("ch")
if not res then
ngx.log(ngx.ERR, err)
return
else
-- redis 推送的消息格式,可能是
-- {"message", ...} 或
-- {"unsubscribe", $channel_name, $remain_channel_num}
-- 如果返回的是前者,说明我们还在读取 Redis 推送过的数据
if res[1] ~= "unsubscribe" then
repeat
-- 需要抽空已经接收到的消息
res, err = red:read_reply()
if not res then
ngx.log(ngx.ERR, err)
return
end
until res[1] == "unsubscribe"
end
-- 现在再复用连接,就足够安全了
self.set_keepalive_mod(redis)
end
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论