如何动态扩展flume的avro sink?

发布于 2022-09-12 03:45:33 字数 2965 浏览 26 评论 0

我有几个服务实例,部署了flume进行日志采集,这些flume将会通过avro sink,将日志传输到flume-master节点进行分发,目前flume-master有两个实例。
我现在遇到的问题是,在请求量上升后,两个flume-master实例已经无法处理所有请求,会产生堆积,我希望能水平扩展flume-master。

目前使用的是下面的负载均衡的形式,将日志sink到两个master实例,但是在需要扩展master节点的时候,需要手动修改flume的配置,加多一个sink,这种方式并不友好。

agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink1 avroSink2

agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel


agent.sinks.avroSink1.type = avro
agent.sinks.avroSink1.hostname = HOST_1
agent.sinks.avroSink1.port = 4143
agent.sinks.avroSink1.channel = fileChannel

agent.sinks.avroSink2.type = avro
agent.sinks.avroSink2.hostname = HOST_2
agent.sinks.avroSink2.port = 4143
agent.sinks.avroSink2.channel = fileChannel

agent.sinkgroups = avroGroup
agent.sinkgroups.avroGroup.sinks = avroSink1 avroSink2
agent.sinkgroups.avroGroup.processor.type = load_balance
agent.sinkgroups.avroGroup.processor.backoff = true
agent.sinkgroups.avroGroup.processor.selector = round_robin


agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000

那么是否可以使用一个代理的HOST来代理所有的flume master节点,这样在对master进行水平扩展的时候,就不需要修改服务上的flume。但经过测试后发现这个方法无法生效。
具体原因如下:
sink第一次传递event的时候通过代理地址拿到了真实地址,并创建了一个长连接,导致sink只会往一个avro地址传递event,直到该地址失效。
然后,我又注意到avro sink中有reset-connection-interval 这个参数,这个参数规定了强制重置avro sink的间隔,看起来似乎可以达到负载均衡的目的,但实际上还是不行的,存在明显的缺陷:

  • avro sink本身创建了长连接,如果为了达到负载均衡的目的,不断重新创建连接,会极大地影响性能
  • 连接会一直创建销毁重新创建,但始终只有一个连接存在,在那段时间内该连接承担所有流量
agent.sources = source
agent.channels = fileChannel
agent.sinks = avroSink

agent.sources.source.type = TAILDIR
agent.sources.source.filegroups = group1
agent.sources.source.filegroups.group1 = /home/xx/logs/xx/xx.log
agent.sources.source.positionFile = /home/xx/data/flume-xx/taildir/position.json
agent.sources.source.skipToEnd = true
agent.sources.source.channels = fileChannel


agent.sinks.avroSink.type = avro
agent.sinks.avroSink.hostname = HOST #HOST 为负载均衡的地址,代理了所有的master节点
agent.sinks.avroSink.port = 4143
agent.sinks.avroSink.channel = fileChannel


agent.channels.fileChannel.type = file
agent.channels.fileChannel.checkpointDir = /home/acs/data/flume-xx/checkpoint
agent.channels.fileChannel.dataDirs = /home/xx/data/flume-xx/data
agent.channels.fileChannel.checkpointInterval = 2
agent.channels.fileChannel.capacity = 400000

以上是对动态扩展avor sink的尝试,最终目的是为了友好的水平扩展以应对流量的增长,所以如果不局限于avro sink,可以通过引入中间件kafka的形式来应对流量突增,因为kafka sink的bootstrap.servers 是一个broker的发现地址,类型负载均衡的实现

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文