如何动态扩展flume的avro sink?
我有几个服务实例,部署了flume进行日志采集,这些flume将会通过avro sink,将日志传输到flume-master节点进行分发,目前flume-master有两个实例。
我现在遇到的问题是,在请求量上升后,两个flume-master实例已经无法处理所有请求,会产生堆积,我希望能水平扩展flume-master。
目前使用的是下面的负载均衡的形式,将日志sink到两个master实例,但是在需要扩展master节点的时候,需要手动修改flume的配置,加多一个sink,这种方式并不友好。
agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink1 avroSink2
agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel
agent.sinks.avroSink1.type = avro
agent.sinks.avroSink1.hostname = HOST_1
agent.sinks.avroSink1.port = 4143
agent.sinks.avroSink1.channel = fileChannel
agent.sinks.avroSink2.type = avro
agent.sinks.avroSink2.hostname = HOST_2
agent.sinks.avroSink2.port = 4143
agent.sinks.avroSink2.channel = fileChannel
agent.sinkgroups = avroGroup
agent.sinkgroups.avroGroup.sinks = avroSink1 avroSink2
agent.sinkgroups.avroGroup.processor.type = load_balance
agent.sinkgroups.avroGroup.processor.backoff = true
agent.sinkgroups.avroGroup.processor.selector = round_robin
agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000
那么是否可以使用一个代理的HOST来代理所有的flume master节点,这样在对master进行水平扩展的时候,就不需要修改服务上的flume。但经过测试后发现这个方法无法生效。
具体原因如下:
sink第一次传递event的时候通过代理地址拿到了真实地址,并创建了一个长连接,导致sink只会往一个avro地址传递event,直到该地址失效。
然后,我又注意到avro sink中有reset-connection-interval 这个参数,这个参数规定了强制重置avro sink的间隔,看起来似乎可以达到负载均衡的目的,但实际上还是不行的,存在明显的缺陷:
- avro sink本身创建了长连接,如果为了达到负载均衡的目的,不断重新创建连接,会极大地影响性能
- 连接会一直创建销毁重新创建,但始终只有一个连接存在,在那段时间内该连接承担所有流量
agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink
agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = HOST #HOST 为负载均衡的地址,代理了所有的master节点
agent.sinks.avroSink.port = 4143
agent.sinks.avroSink.channel = fileChannel
agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000
以上是对动态扩展avor sink的尝试,最终目的是为了友好的水平扩展以应对流量的增长,所以如果不局限于avro sink,可以通过引入中间件kafka的形式来应对流量突增,因为kafka sink的bootstrap.servers 是一个broker的发现地址,类型负载均衡的实现
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论