Apache Beam Initializer

发布于 2025-01-18 09:48:51 字数 978 浏览 4 评论 0 原文

在我的DataFlow作业中,我需要在实际处理开始之前初始化配置工厂并在审核日志中记录某些消息。

我已经将配置工厂初始化代码 +审核记录放在父级 platformInitializer 中,并将其扩展在我的主管道类中。

public class CustomJob extends PlatformInitializer implements Serializable {
    public static void main(String[] args) throws PropertyVetoException {
        CustomJob myCustomjob = new CustomJob();

        // Initialize config factories
        myCustomjob.initialize();

        // trigger dataflow job
        myCustomjob.parallelRead(args);
    }

结果,我还必须在管道类中实现可序列化的接口,因为beam丢了错误 - java.io.notserializableException:org.devoteam.customjob

in platformInitialializer,我有一个Initilize()方法()包含配置工厂的初始化逻辑,还可以记录一些初始审核消息。

public class PlatformInitializer {

    public void initialize() {
        // Configfactory factory = new Configfactory()
        // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
    }
}

我的问题是 - 这是调用一些需要在管道开始执行之前调用某些代码的正确方法吗?

In my dataflow job, I need to initialize a Config factory and log certain messages in an audit log before actual processing begins.

I have placed the Config factory initialization code + audit logging in a parent class PlatformInitializer and extending that in my Main Pipeline class.

public class CustomJob extends PlatformInitializer implements Serializable {
    public static void main(String[] args) throws PropertyVetoException {
        CustomJob myCustomjob = new CustomJob();

        // Initialize config factories
        myCustomjob.initialize();

        // trigger dataflow job
        myCustomjob.parallelRead(args);
    }

as a result, I had to also implement Serializable interface in my Pipeline class because beam was throwing error - java.io.NotSerializableException: org.devoteam.CustomJob

Inside PlatformInitializer, I have an initilize() method that contains initialization logic for config factory and also log some initial audit messages.

public class PlatformInitializer {

    public void initialize() {
        // Configfactory factory = new Configfactory()
        // CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
    }
}

My question is - is this right way to invoke some code that needs to be called before pipeline begins execution?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

萧瑟寒风 2025-01-25 09:48:51

如果您需要在运行时(而不是在管道构建时)初始化对象,则应该将初始化逻辑移至 Beam DoFnDoFn 有许多方法注释,可用于表示应在不同生命周期阶段执行的方法。 SetupStartBundle 注释可能对您的用例有用。请参阅此处了解更多详细信息。

If you need the initialized object at runtime (not at the pipeline construction time), you should move your initialization logic to a Beam DoFn. DoFn has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases. Setup and StartBundle annotations might be useful for your use-case. See here for more details.

又怨 2025-01-25 09:48:51

The JVM initializer interface: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/harness/JvmInitializer.html , may be appropriate for process-level one-time initializations. For initializing long-lived or expensive objects (e.g. connections) applicable to a specific DoFn / PTransform, DoFn lifecycle methods are usually more appropriate (see @chamikara's answer).

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文