使用flume向elasticsearch写日志,下载了jar包后仍然提示有问题?

发布于 2022-09-06 14:59:20 字数 11802 浏览 20 评论 0

我用1.8版本的flume 在java 1.8.0_101 的主机上向elasticsearch 5.0.0传日志,我下载了 elasticsearch-5.0.0.jar 与 lucene-core-5.0.0.jar 放在flume 的lib目录下,启动后仍然提示有问题,这个依赖下载有问题吗?

以下是报错信息

Info: Including Hadoop libraries found via (/usr/local/hadoop/bin/hadoop) for HDFS access
Info: Including Hive libraries found via () for Hive access
+ exec /usr/local/jdk1.8.0_101/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/mnt/flume_Outer_1.8/conf:/mnt/flume_Outer_1.8/lib/*:/usr/local/hadoop-2.6.5/etc/hadoop:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/common/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/hdfs/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/yarn/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/lib/*:/usr/local/hadoop-2.6.5/share/hadoop/mapreduce/*:/usr/local/hadoop/contrib/capacity-scheduler/*.jar:/lib/*' -Djava.library.path=:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib org.apache.flume.node.Application -f conf/infotest -n a1
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/mnt/flume_Outer_1.8/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop-2.6.5/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
18/02/08 18:07:49 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
18/02/08 18:07:49 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:conf/infotest
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Added sinks: s1 Agent: a1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Processing:s1
18/02/08 18:07:49 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [a1]
18/02/08 18:07:49 INFO node.AbstractConfigurationProvider: Creating channels
18/02/08 18:07:49 INFO channel.DefaultChannelFactory: Creating instance of channel ch1 type memory
18/02/08 18:07:49 INFO node.AbstractConfigurationProvider: Created channel ch1
18/02/08 18:07:49 INFO source.DefaultSourceFactory: Creating instance of source r1, type exec
18/02/08 18:07:49 INFO sink.DefaultSinkFactory: Creating instance of sink: s1, type: elasticsearch
18/02/08 18:07:49 INFO node.AbstractConfigurationProvider: Channel ch1 connected to [r1, s1]
18/02/08 18:07:49 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:r1,state:IDLE} }} sinkRunners:{s1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6add9eb counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} }
18/02/08 18:07:49 INFO node.Application: Starting Channel ch1
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean.
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: ch1 started
18/02/08 18:07:49 INFO node.Application: Starting Sink s1
18/02/08 18:07:49 INFO node.Application: Starting Source r1
18/02/08 18:07:49 INFO source.ExecSource: Exec source starting with command: tail -n +0 -F /mnt/echat-log/info/echat_old/echat_third/echat.log.2018-02-07
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
18/02/08 18:07:49 INFO elasticsearch.ElasticSearchSink: ElasticSearch sink {} started
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: s1: Successfully registered new MBean.
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: s1 started
18/02/08 18:07:49 WARN client.ElasticSearchTransportClient: [192.168.1.4:9200]
18/02/08 18:07:49 ERROR lifecycle.LifecycleSupervisor: Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6add9eb counterGroup:{ name:null counters:{} } } - Exception follows.
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:141)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:358)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/02/08 18:07:49 INFO elasticsearch.ElasticSearchSink: ElasticSearch sink {} stopping
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: s1 stopped
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.start.time == 1518084469218
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.stop.time == 1518084469230
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.batch.complete == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.batch.empty == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.batch.underflow == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.connection.closed.count == 1
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.connection.creation.count == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.connection.failed.count == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.event.drain.attempt == 0
18/02/08 18:07:49 INFO instrumentation.MonitoredCounterGroup: Shutdown Metric for type: SINK, name: s1. sink.event.drain.sucess == 0
18/02/08 18:07:49 WARN lifecycle.LifecycleSupervisor: Component SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6add9eb counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies
18/02/08 18:08:19 ERROR source.ExecSource: Failed while running command: tail -n +0 -F /mnt/echat-log/info/echat_old/echat_third/echat.log.2018-02-07
org.apache.flume.ChannelFullException: Space for commit to queue couldn't be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:128)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
    at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:378)
    at org.apache.flume.source.ExecSource$ExecRunnable.run(ExecSource.java:338)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
18/02/08 18:08:19 ERROR source.ExecSource: Exception occurred when processing event batch
org.apache.flume.ChannelException: java.lang.InterruptedException
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:154)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:194)
    at org.apache.flume.source.ExecSource$ExecRunnable.flushEventBatch(ExecSource.java:378)
    at org.apache.flume.source.ExecSource$ExecRunnable.access$100(ExecSource.java:251)
    at org.apache.flume.source.ExecSource$ExecRunnable$1.run(ExecSource.java:320)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
    at java.util.concurrent.Semaphore.tryAcquire(Semaphore.java:582)
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doCommit(MemoryChannel.java:126)
    at org.apache.flume.channel.BasicTransactionSemantics.commit(BasicTransactionSemantics.java:151)
    ... 11 more

以下是配置文件

a1.channels = ch1
a1.sources = r1
a1.sinks = s1

a1.channels.ch1.type = memory
a1.channels.ch1.capacity = 1000
a1.channels.ch1.transactionCapacity = 1000
a1.channels.ch1.keep-alive = 30

a1.sources.r1.type = exec
a1.sources.r1.shell = /bin/bash -c
a1.sources.r1.command = tail -n +0 -F /mnt/echat-log/info/echat_old/echat_third/echat.log.2018-02-07
a1.sources.r1.channels = ch1
a1.sources.r1.threads = 5
a1.sources.r1.restartThrottle = 100000
a1.sources.r1.restart = true
a1.sources.r1.logStdErr = true

a1.sinks.s1.channel = ch1
a1.sinks.s1.type = elasticsearch
a1.sinks.s1.hostNames = 192.168.1.4:9200
a1.sinks.s1.indexName = foo_index
a1.sinks.s1.indexType = bar_type
a1.sinks.s1.batchSize = 500
a1.sinks.s1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

染墨丶若流云 2022-09-13 14:59:20

经过自己查询很多文章了解到,flume所支持elasticsearch版本较为落后,如果要想使用flume直接向es传输,需要使用低版本的es,而且需要修改jar包中的一些方法。flume的更新速度也远不如es的更新速率,所以应该选取其它的日志收集策略

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文