编写自定义 Flume 装饰器,但出现错误。我缺少什么?
我正在为 Cloudera 的分布式日志聚合系统 Flume 编写一个自定义装饰器插件。我的 Java 代码如下:
package multiplex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
private final String serverName;
private final String logType;
public JsonMultiplexDecorator(S s, String serverName, String logType) {
super(s);
this.serverName = serverName;
this.logType = logType;
}
@Override
public void append(Event e) throws IOException {
String body = new String(e.getBody()).replaceAll("\"", "\\\"");
String json = "{ \"server\": \"" + this.serverName + "\"," +
"\"log_type\": \"" + this.logType + "\", " +
"\"body\": \"" + body + "\" }";
EventImpl e2 = new EventImpl(json.getBytes(),
e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
e.getAttrs());
super.append(e2);
}
public static SinkDecoBuilder builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink> build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 2,
"usage: multiplexDecorator(serverName, logType)");
return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
}
};
}
public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
List<Pair<String, SinkDecoBuilder>> builders =
new ArrayList<Pair<String, SinkDecoBuilder>>();
builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));
return builders;
}
}
这可以使用 ant 很好地编译成 JAR 文件,我可以在运行时将其加载到 Flume 中并成功配置节点以使用它。但是,当事件实际在加载了此插件的节点上发生时,我的日志中会出现如下错误:(
2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
[逻辑节点 xxxxx]
是 EC2 内部 DNS 名称的占位符)。我没有太多的 Java 经验,所以我不确定我是否在这里做错了什么,或者这是否是 Flume 的错误。我应该提到的是,我使用 Flume 源代码中的 HelloWorld 插件示例编写了这篇文章,并且还从一些内置的 Flume 装饰器中进行了绘制。
I'm writing a custom decorator plugin for Cloudera's distributed log aggregation system, Flume. My Java code is below:
package multiplex;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;
public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
private final String serverName;
private final String logType;
public JsonMultiplexDecorator(S s, String serverName, String logType) {
super(s);
this.serverName = serverName;
this.logType = logType;
}
@Override
public void append(Event e) throws IOException {
String body = new String(e.getBody()).replaceAll("\"", "\\\"");
String json = "{ \"server\": \"" + this.serverName + "\"," +
"\"log_type\": \"" + this.logType + "\", " +
"\"body\": \"" + body + "\" }";
EventImpl e2 = new EventImpl(json.getBytes(),
e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
e.getAttrs());
super.append(e2);
}
public static SinkDecoBuilder builder() {
return new SinkDecoBuilder() {
@Override
public EventSinkDecorator<EventSink> build(Context context,
String... argv) {
Preconditions.checkArgument(argv.length == 2,
"usage: multiplexDecorator(serverName, logType)");
return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
}
};
}
public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
List<Pair<String, SinkDecoBuilder>> builders =
new ArrayList<Pair<String, SinkDecoBuilder>>();
builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));
return builders;
}
}
This compiles fine into a JAR file with ant, I can load it into Flume at runtime and successfully configure nodes to use it. However, when an event actually comes through on a node that has this plugin loaded, I get errors in my log like this:
2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)
(The [logicalNode xxxxx]
is a placeholder for an EC2 internal DNS name). I don't have a lot of Java experience so I'm not sure if I'm doing something wrong here or if this is a Flume bug. I should mention that I wrote this using the HelloWorld plugin examples from the Flume source, and also drawing from some of the built-in Flume decorators.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当您构造
EventImpl e2
时,您将传递e.getAttrs()
,它是不可修改的。尝试将e.getAttrs()
复制到您自己的地图中;使用 new HashMap(e.getAttrs()) 的浅拷贝应该就足够了。参考:https://groups .google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1
When you construct
EventImpl e2
, you are passinge.getAttrs()
, which is unmodifiable. Try copyinge.getAttrs()
into a map of your own; a shallow copy usingnew HashMap(e.getAttrs())
should be sufficient.Reference: https://groups.google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1