使用 Apache Flink 进行分布式执行以及类字段/成员的(反)序列化
我对 Flink 作业有一个简单的概念证明。它基本上从 Kafka 主题接收消息(JSON 格式),将它们反序列化为域模型,根据一些预定义的规则集验证这些消息,应用一些转换,最后将生成的消息发布到 Kafka 接收器中。
我确实有几个函数/运算符使用其他“服务”类的某些行为。这些“服务”类也可以导入一些其他依赖项。
据我所知,Flink 会尝试对这些函数/运算符进行序列化(反序列化),以使整个作业真正实现分布式。我不清楚 Flink 是否会通过在这些字段/成员上使用瞬态来自动避免这种情况,或者是否足以将它们声明为静态来避免这种情况。
这是我所拥有的一个示例:
public final class SomeFlatMapFunction implements FlatMapFunction<SomeMessage, Some> {
private static final long serialVersionUID = -5810858761065889162L;
private static final SomeMapper MAPPER = SomeMapper.INSTANCE;
private static final Validator VALIDATOR = Validator.INSTANCE;
@Override
public void flatMap(final SomeMessage value, final Collector<Some> out) {
final var result = MAPPER.valueFrom(value);
final var violations = VALIDATOR.getValidator().validate(result);
if (violations.isEmpty()) {
out.collect(result);
}
}
}
到目前为止,我还没有看到任何问题,但我只是在本地运行该应用程序。即使对于必须在函数的构造函数中注入这些依赖项的情况,这里最好/接受的方法是什么?看起来非常不鼓励在这些函数之间维护状态。
I have a simple proof of concept for a Flink job. It basically receives messages (in JSON format) from a Kafka topic, deserialize them into a domain model, validate these against some predefined set of rules, applies some transformations, and finally publishes a resultant message into a Kafka sink.
I do have several functions/operators that use some behavior from other "service" classes. Those "service" classes could import some other dependencies as well.
As far as I know, Flink will try to (de)serialize those functions/operators in order to make the entire job truly distributed. I'm not clear if Flink would automatically avoid that by using transient
on those fields/members or if it would be enough to declare them as static
to avoid that.
This is an example of what I have:
public final class SomeFlatMapFunction implements FlatMapFunction<SomeMessage, Some> {
private static final long serialVersionUID = -5810858761065889162L;
private static final SomeMapper MAPPER = SomeMapper.INSTANCE;
private static final Validator VALIDATOR = Validator.INSTANCE;
@Override
public void flatMap(final SomeMessage value, final Collector<Some> out) {
final var result = MAPPER.valueFrom(value);
final var violations = VALIDATOR.getValidator().validate(result);
if (violations.isEmpty()) {
out.collect(result);
}
}
}
I haven't seen any issues with this so far, but I'm just running the application locally. What's the best/accepted approach here, even for those case where one could have to inject those dependencies in the function's constructor? It looks very much that maintaining state between those functions is highly discouraged as well.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
运算符确实会被序列化和反序列化,
这就是为什么有多个带有
open
和close
方法的Rich*
版本的运算符,它们可以用来在反序列化之后设置一些东西,
一旦操作员已经位于它将运行的任务管理器中。
Flink 将尊重 Java 通常的序列化规则,不会序列化
static
或transient
成员。根据我的经验,在运算符的构造函数中注入域类不是问题。
您需要小心的是作业运行时通过网络的域类,
有时称为数据传输对象。
对于这些,最简单的事情是将它们实现为 POJO,其中有两点至关重要:
如果此类 POJO 将成为应用程序状态的一部分,那么第二点尤其重要,
即如果您使用 Flink 的托管状态 API。
还有一些你已经考虑过的事情:
添加
serialVersionUID
也是一个好主意。Operators do get serialized and deserialized,
that's why there are several
Rich*
versions of the operators withopen
andclose
methods,they can be used to set up things after deserialization,
once the operator is already in the task manager where it will run.
Flink will respect Java's usual serialization rules and will not serialize
static
ortransient
members.In my experience, injecting domain classes in operators' constructors isn't a problem.
Where you need to be careful is with domain classes that go through the network while the job is running,
what sometimes is referred to as Data Transfer Objects.
For those, the simplest thing is to implement them as POJOs, where 2 things are critical:
The second is particularly important if such POJOs will be part of your application's state,
i.e. if you are using Flink's managed state API.
And somethig you already considered:
adding
serialVersionUID
is also a good idea.