flume interceptor 设置host不生效

发布于 2022-09-03 14:59:25 字数 4760 浏览 20 评论 0

使用flume-ng传输日志信息,用均衡负载的方式将agent数据推送到collector,数据流向如图:
图片描述

未了区分不同的agent发送的数据,我在collector的config文件中设置host interceptor,其中一个agent的config文件如下:

#name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = topic
a1.sources.r1.interceptors.i1.value = flume_test

#define sinkgroups
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=k1 k2
a1.sinkgroups.g1.processor.type=load_balance
a1.sinkgroups.g1.processor.backoff=true
a1.sinkgroups.g1.processor.selector=round_robin

#define the sink 1
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=10.0.3.82
a1.sinks.k1.port=5150

#define the sink 2
a1.sinks.k2.type=avro
a1.sinks.k2.hostname=10.0.3.83
a1.sinks.k2.port=5150


# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
a1.sinks.k2.channel=c1

collector1的config文件:

collector1.sources = r1
collector1.channels = c1 c2
collector1.sinks = k1 k2

# Describe the source
collector1.sources.r1.type = avro
collector1.sources.r1.port = 5150
collector1.sources.r1.bind = 0.0.0.0
collector1.sources.r1.channels = c1 c2
collector1.sources.r1.interceptors = i2
collector1.sources.r1.interceptors.i2.type = host
collector1.sources.r1.interceptors.i2.hostHeader = agentHost

# Describe channels c1 c2 which buffers events in memory
collector1.channels.c1.type = file
collector1.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector1.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector1.channels.c2.type = memory
collector1.channels.c2.capacity = 1000
collector1.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector1.sinks.k1.type = hdfs
collector1.sinks.k1.channel = c1
collector1.sinks.k1.hdfs.path = /quantone/flume/%{agentHost}
collector1.sinks.k1.hdfs.fileType = DataStream
collector1.sinks.k1.hdfs.writeFormat = TEXT
collector1.sinks.k1.hdfs.rollInterval = 300
collector1.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%{agentHost}
collector1.sinks.k1.hdfs.round = true
collector1.sinks.k1.hdfs.roundValue = 5
collector1.sinks.k1.hdfs.roundUnit = minute
collector1.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector1.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector1.sinks.k2.channel = c2
collector1.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector1.sinks.k2.requiredAcks = 1
collector1.sinks.k2.batchSize = 20

collector2的config文件:

collector2.sources = r1
collector2.channels = c1 c2
collector2.sinks = k1 k2

# Describe the source
collector2.sources.r1.type = avro
collector2.sources.r1.port = 5150
collector2.sources.r1.bind = 0.0.0.0
collector2.sources.r1.channels = c1 c2
collector2.sources.r1.interceptors = i2
collector2.sources.r1.interceptors.i2.type = host
collector2.sources.r1.interceptors.i2.hostHeader = agentHost

# Describe channels c1 c2 which buffers events in memory
collector2.channels.c1.type = file
collector2.channels.c1.checkpointDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/checkpoint
collector2.channels.c1.dataDir = /usr/local/apache-flume-1.6.0-bin/fileChannel/data

collector2.channels.c2.type = memory
collector2.channels.c2.capacity = 1000
collector2.channels.c2.transactionCapacity = 100

# Describe the sink k1 to hadoop
collector2.sinks.k1.type = hdfs
collector2.sinks.k1.channel = c1
collector2.sinks.k1.hdfs.path = /quantone/flume/%{agentHost}
collector2.sinks.k1.hdfs.fileType = DataStream
collector2.sinks.k1.hdfs.writeFormat = TEXT
collector2.sinks.k1.hdfs.rollInterval = 300
collector2.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%{agentHost}
collector2.sinks.k1.hdfs.round = true
collector2.sinks.k1.hdfs.roundValue = 5
collector2.sinks.k1.hdfs.roundUnit = minute
collector2.sinks.k1.hdfs.useLocalTimeStamp = true

# Describe the sink k2 to kafka
collector2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
collector2.sinks.k2.channel = c2
collector2.sinks.k2.brokerList = 10.0.3.178:9092,10.0.3.179:9092
collector2.sinks.k2.requiredAcks = 1
collector2.sinks.k2.batchSize = 20

在collector1和collector2的config文件中,我使用%{agentHost} 在hdfs的路径和文件名中生成agent的ip地址,但是在最终生成的文件中和路径中这个字段都是空的。
有没有大侠指点下?谢谢!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文