编写自定义 Flume 装饰器,但出现错误。我缺少什么?

发布于 2024-09-28 04:32:18 字数 3488 浏览 9 评论 0原文

我正在为 Cloudera 的分布式日志聚合系统 Flume 编写一个自定义装饰器插件。我的 Java 代码如下:

package multiplex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
  private final String serverName;
  private final String logType;

  public JsonMultiplexDecorator(S s, String serverName, String logType) {
    super(s);

    this.serverName = serverName;
    this.logType = logType;
  }

  @Override
  public void append(Event e) throws IOException {
    String body = new String(e.getBody()).replaceAll("\"", "\\\"");

    String json = "{ \"server\": \"" + this.serverName + "\"," +
      "\"log_type\": \"" + this.logType + "\", " +
      "\"body\": \"" + body + "\" }";

    EventImpl e2 = new EventImpl(json.getBytes(),
        e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
        e.getAttrs());

    super.append(e2);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        Preconditions.checkArgument(argv.length == 2,
            "usage: multiplexDecorator(serverName, logType)");

        return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
      }
    };
  }

  public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
    List<Pair<String, SinkDecoBuilder>> builders = 
      new ArrayList<Pair<String, SinkDecoBuilder>>();

    builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));

    return builders;
  }
}

这可以使用 ant 很好地编译成 JAR 文件,我可以在运行时将其加载到 Flume 中并成功配置节点以使用它。但是,当事件实际在加载了此插件的节点上发生时,我的日志中会出现如下错误:(

2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
    at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
    at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
    at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
    at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
    at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
    at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)

[逻辑节点 xxxxx] 是 EC2 内部 DNS 名称的占位符)。我没有太多的 Java 经验,所以我不确定我是否在这里做错了什么,或者这是否是 Flume 的错误。我应该提到的是,我使用 Flume 源代码中的 HelloWorld 插件示例编写了这篇文章,并且还从一些内置的 Flume 装饰器中进行了绘制。

I'm writing a custom decorator plugin for Cloudera's distributed log aggregation system, Flume. My Java code is below:

package multiplex;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkDecoBuilder;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
import com.cloudera.flume.core.EventSink;
import com.cloudera.flume.core.EventSinkDecorator;
import com.cloudera.util.Pair;
import com.google.common.base.Preconditions;

public class JsonMultiplexDecorator<S extends EventSink> extends EventSinkDecorator<S> {
  private final String serverName;
  private final String logType;

  public JsonMultiplexDecorator(S s, String serverName, String logType) {
    super(s);

    this.serverName = serverName;
    this.logType = logType;
  }

  @Override
  public void append(Event e) throws IOException {
    String body = new String(e.getBody()).replaceAll("\"", "\\\"");

    String json = "{ \"server\": \"" + this.serverName + "\"," +
      "\"log_type\": \"" + this.logType + "\", " +
      "\"body\": \"" + body + "\" }";

    EventImpl e2 = new EventImpl(json.getBytes(),
        e.getTimestamp(), e.getPriority(), e.getNanos(), e.getHost(),
        e.getAttrs());

    super.append(e2);
  }

  public static SinkDecoBuilder builder() {
    return new SinkDecoBuilder() {
      @Override
      public EventSinkDecorator<EventSink> build(Context context,
          String... argv) {
        Preconditions.checkArgument(argv.length == 2,
            "usage: multiplexDecorator(serverName, logType)");

        return new JsonMultiplexDecorator<EventSink>(null, argv[0], argv[1]);
      }
    };
  }

  public static List<Pair<String, SinkDecoBuilder>> getDecoratorBuilders() {
    List<Pair<String, SinkDecoBuilder>> builders = 
      new ArrayList<Pair<String, SinkDecoBuilder>>();

    builders.add(new Pair<String, SinkDecoBuilder>("jsonMultiplexDecorator", builder()));

    return builders;
  }
}

This compiles fine into a JAR file with ant, I can load it into Flume at runtime and successfully configure nodes to use it. However, when an event actually comes through on a node that has this plugin loaded, I get errors in my log like this:

2010-10-19 21:03:15,176 [logicalNode xxxxx] ERROR connector.DirectDriver: Driving src/sink failed! LazyOpenSource | LazyOpenDecorator because null
java.lang.UnsupportedOperationException
    at java.util.Collections$UnmodifiableMap.put(Collections.java:1285)
    at com.cloudera.flume.core.EventBaseImpl.set(EventBaseImpl.java:65)
    at com.cloudera.flume.handlers.rolling.RollSink.append(RollSink.java:164)
    at com.cloudera.flume.agent.diskfailover.DiskFailoverDeco.append(DiskFailoverDeco.java:93)
    at com.cloudera.flume.core.BackOffFailOverSink.append(BackOffFailOverSink.java:144)
    at com.cloudera.flume.agent.AgentSink.append(AgentSink.java:109)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at multiplex.JsonMultiplexDecorator.append(JsonMultiplexDecorator.java:56)
    at com.cloudera.flume.core.EventSinkDecorator.append(EventSinkDecorator.java:58)
    at com.cloudera.flume.handlers.debug.LazyOpenDecorator.append(LazyOpenDecorator.java:69)
    at com.cloudera.flume.core.connector.DirectDriver$PumperThread.run(DirectDriver.java:92)

(The [logicalNode xxxxx] is a placeholder for an EC2 internal DNS name). I don't have a lot of Java experience so I'm not sure if I'm doing something wrong here or if this is a Flume bug. I should mention that I wrote this using the HelloWorld plugin examples from the Flume source, and also drawing from some of the built-in Flume decorators.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

优雅的叶子 2024-10-05 04:32:18

当您构造 EventImpl e2 时,您将传递 e.getAttrs(),它是不可修改的。尝试将 e.getAttrs() 复制到您自己的地图中;使用 new HashMap(e.getAttrs()) 的浅拷贝应该就足够了。

参考:https://groups .google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1

When you construct EventImpl e2, you are passing e.getAttrs(), which is unmodifiable. Try copying e.getAttrs() into a map of your own; a shallow copy using new HashMap(e.getAttrs()) should be sufficient.

Reference: https://groups.google.com/a/cloudera.org/group/flume-user/browse_thread/thread/046b4a446877c8f9?pli=1

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文