Flink 错误“StreamExecutionEnvironment 的实现不可序列化”
我是Flink的初学者,我尝试使用Flink来运行推荐算法之一的LFM,但是运行时我的代码出现了以下错误。我尝试查找并修改,但都没有解决。有人可以告诉我为什么我遇到问题吗?
这是我的主要例外
The implementation of the StreamExecutionEnvironment is not serializable
这里是我的代码
请注意,sourceDataStream
来自我的自定义源扩展RichFunction
//training model
for (int iter = 0; iter < iterations; iter++) {
sourceDataStream
// the exception appears here
.process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
@Override
public void processElement(Tuple3<String, String, Double> in,
ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
Collector<Object> collector) throws Exception {
Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
Double err_ui = in.f2 - hat_rui;
Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
Collector<Object> collector) throws Exception {
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {
double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));
List<Double> pList = new ArrayList<>();
List<Double> qList = new ArrayList<>();
for (int fIter = 0; fIter < F; fIter++) {
Double pValueLast = userTuple.f1.f1.get(fIter);
Double qValueLast = itemTuple.f1.f1.get(fIter);
Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
pList.add(pValueNew);
qList.add(qValueNew);
}
streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
}
});
}
});
}
});
}
I am a beginner of Flink, and I tried to use Flink to run LFM, one of the recommended algorithms, but the following errors appeared in my code when it was running. I tried to find and modify them, but they were not solved. Could someone tell me why I had problems?
Here are my main exception
The implementation of the StreamExecutionEnvironment is not serializable
And my code here
Noted that sourceDataStream
is from my custom source extends RichFunction<Tuple3<>>
//training model
for (int iter = 0; iter < iterations; iter++) {
sourceDataStream
// the exception appears here
.process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
@Override
public void processElement(Tuple3<String, String, Double> in,
ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
Collector<Object> collector) throws Exception {
Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
Double err_ui = in.f2 - hat_rui;
Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
Collector<Object> collector) throws Exception {
DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {
double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));
List<Double> pList = new ArrayList<>();
List<Double> qList = new ArrayList<>();
for (int fIter = 0; fIter < F; fIter++) {
Double pValueLast = userTuple.f1.f1.get(fIter);
Double qValueLast = itemTuple.f1.f1.get(fIter);
Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
pList.add(pValueNew);
qList.add(qValueNew);
}
streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
}
});
}
});
}
});
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
有一些与此相关的事情是行不通的:
在任何用户函数(例如
ProcessFunction
)的实现中,您不能拥有DataStream
或Table
,或StreamExecutionEnvironment
,或另一个ProcessFunction
。您所能做的就是对传入的流记录做出反应,可以选择使用您在该函数内基于之前处理的记录建立的状态。DataStream
和Table
API 围绕构建器范例进行组织,您可以使用该范例来描述流数据流管道。该管道必须是有向无环图:它可以拆分和合并,但必须从源流到接收器,没有任何循环。该管道的各个阶段(例如,ProcessFunction
)必须编码为独立的块——它们无法到达自身外部来访问来自其他管道阶段的数据。这种范例不太适合训练机器学习模型的目的(因为训练涉及迭代/循环)。如果这是您的目标,也许可以看看 https://github.com/apache/flink-ml< /a>.
There are a few things about this that aren't going to work:
In the implementation of any user function (such as a
ProcessFunction
) you cannot have aDataStream
, or aTable
, or aStreamExecutionEnvironment
, or anotherProcessFunction
. All you can do is react to an incoming stream record, optionally using state you have built up inside that function based on the previously processed records.The
DataStream
andTable
APIs are organized around a builder paradigm, with which you are describing a streaming dataflow pipeline. This pipeline must be a directed acyclic graph: it can split and merge but must flow from sources to sinks without any loops. The stages of that pipeline (e.g., aProcessFunction
) must be coded as independent blocks -- they cannot reach outside of themselves to access data from other pipeline stages.This paradigm isn't well suited for the purpose of training machine learning models (since training involves iterating/looping). If that's your objective, maybe take a look at https://github.com/apache/flink-ml.