Apache Beam Initializer
在我的DataFlow作业中,我需要在实际处理开始之前初始化配置工厂并在审核日志中记录某些消息。
我已经将配置工厂初始化代码 +审核记录放在父级 platformInitializer
中,并将其扩展在我的主管道类中。
public class CustomJob extends PlatformInitializer implements Serializable {
public static void main(String[] args) throws PropertyVetoException {
CustomJob myCustomjob = new CustomJob();
// Initialize config factories
myCustomjob.initialize();
// trigger dataflow job
myCustomjob.parallelRead(args);
}
结果,我还必须在管道类中实现可序列化的接口,因为beam丢了错误 - java.io.notserializableException:org.devoteam.customjob
in platformInitialializer,我有一个Initilize()方法()包含配置工厂的初始化逻辑,还可以记录一些初始审核消息。
public class PlatformInitializer {
public void initialize() {
// Configfactory factory = new Configfactory()
// CustomLogger.log("JOB-BEGIN-EVENT" + Current timestamp )
}
}
我的问题是 - 这是调用一些需要在管道开始执行之前调用某些代码的正确方法吗?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
如果您需要在运行时(而不是在管道构建时)初始化对象,则应该将初始化逻辑移至 Beam DoFn。
DoFn
有许多方法注释,可用于表示应在不同生命周期阶段执行的方法。Setup
和StartBundle
注释可能对您的用例有用。请参阅此处了解更多详细信息。If you need the initialized object at runtime (not at the pipeline construction time), you should move your initialization logic to a Beam DoFn.
DoFn
has a number of method annotations that could be used to denote methods that should be executed in different lifecycle phases.Setup
andStartBundle
annotations might be useful for your use-case. See here for more details.JVM初始器接口:,可能适用于Process-Level一次性初始化。为了初始化适用于特定dofn / ptransform的长寿命或昂贵对象(例如连接),DOFN生命周期方法通常更合适(请参阅 @chamikara的答案)。
The JVM initializer interface: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/harness/JvmInitializer.html , may be appropriate for process-level one-time initializations. For initializing long-lived or expensive objects (e.g. connections) applicable to a specific DoFn / PTransform, DoFn lifecycle methods are usually more appropriate (see @chamikara's answer).