Flink 错误“StreamExecutionEnvironment 的实现不可序列化”

发布于 2025-01-11 23:38:57 字数 4325 浏览 3 评论 0原文

我是Flink的初学者,我尝试使用Flink来运行推荐算法之一的LFM,但是运行时我的代码出现了以下错误。我尝试查找并修改,但都没有解决。有人可以告诉我为什么我遇到问题吗?

这是我的主要例外

The implementation of the StreamExecutionEnvironment is not serializable

这里是我的代码

请注意,sourceDataStream 来自我的自定义源扩展RichFunction>>

//training model
for (int iter = 0; iter < iterations; iter++) {
    sourceDataStream
        // the exception appears here
        .process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
            @Override
            public void processElement(Tuple3<String, String, Double> in,
                                                   ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
                                                   Collector<Object> collector) throws Exception {

                Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
                Double err_ui = in.f2 - hat_rui;
                Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
                Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));

                DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                @Override
                public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
                                                           ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
                                                           Collector<Object> collector) throws Exception {

                    DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                    qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                        @Override
                        public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {

                            double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
                            double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));

                            List<Double> pList = new ArrayList<>();
                            List<Double> qList = new ArrayList<>();
                            for (int fIter = 0; fIter < F; fIter++) {
                                Double pValueLast = userTuple.f1.f1.get(fIter);
                                Double qValueLast = itemTuple.f1.f1.get(fIter);
                                Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
                                Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
                                pList.add(pValueNew);
                                qList.add(qValueNew);
                            }
                            streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
                            streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
                        }
                    });
                }
            });
        }
    });
}

I am a beginner of Flink, and I tried to use Flink to run LFM, one of the recommended algorithms, but the following errors appeared in my code when it was running. I tried to find and modify them, but they were not solved. Could someone tell me why I had problems?

Here are my main exception

The implementation of the StreamExecutionEnvironment is not serializable

And my code here

Noted that sourceDataStream is from my custom source extends RichFunction<Tuple3<>>

//training model
for (int iter = 0; iter < iterations; iter++) {
    sourceDataStream
        // the exception appears here
        .process(new ProcessFunction<Tuple3<String, String, Double>, Object>() {
            @Override
            public void processElement(Tuple3<String, String, Double> in,
                                                   ProcessFunction<Tuple3<String, String, Double>, Object>.Context context,
                                                   Collector<Object> collector) throws Exception {

                Double hat_rui = predict(in.f0, in.f1, qbiTable, pbuTable, streamTableEnvironment);
                Double err_ui = in.f2 - hat_rui;
                Table pbuSelectTable = pbuTable.select($("buValue"), $("pList")).where($("userId").isEqual(in.f0));
                Table qbiSelectTable = qbiTable.select($("biValue"), $("qList")).where($("itemId").isEqual(in.f1));

                DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> pbuSelectDataStream = streamTableEnvironment.toRetractStream(pbuSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                pbuSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                @Override
                public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> userTuple,
                                                           ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context,
                                                           Collector<Object> collector) throws Exception {

                    DataStream<Tuple2<Boolean, Tuple2<Double, List<Double>>>> qbiSelectDataStream = streamTableEnvironment.toRetractStream(qbiSelectTable, new TupleTypeInfo<>(Types.BOOLEAN, Types.DOUBLE, Types.LIST(Types.DOUBLE)));
                    qbiSelectDataStream.process(new ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>() {

                        @Override
                        public void processElement(Tuple2<Boolean, Tuple2<Double, List<Double>>> itemTuple, ProcessFunction<Tuple2<Boolean, Tuple2<Double, List<Double>>>, Object>.Context context, Collector<Object> collector) throws Exception {

                            double bu = userTuple.f1.f0 + (alpha * (err_ui - lambd * userTuple.f1.f0));
                            double bi = itemTuple.f1.f0 + (alpha * (err_ui - lambd * itemTuple.f1.f0));

                            List<Double> pList = new ArrayList<>();
                            List<Double> qList = new ArrayList<>();
                            for (int fIter = 0; fIter < F; fIter++) {
                                Double pValueLast = userTuple.f1.f1.get(fIter);
                                Double qValueLast = itemTuple.f1.f1.get(fIter);
                                Double qValueNew = qValueLast + (alpha * (err_ui * pValueLast - lambd * qValueLast));
                                Double pValueNew = pValueLast + (alpha * (err_ui * qValueLast - lambd * pValueLast));
                                pList.add(pValueNew);
                                qList.add(qValueNew);
                            }
                            streamTableEnvironment.executeSql("INSERT OVERWRITE qbiTable VALUES ('qList', " + qList + "), ('biValue', " + bi + ")");
                            streamTableEnvironment.executeSql("INSERT OVERWRITE pbuTable VALUES ('pList', " + pList + "), ('buValue', " + bu + ")");
                        }
                    });
                }
            });
        }
    });
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

︶葆Ⅱㄣ 2025-01-18 23:38:57

有一些与此相关的事情是行不通的:

在任何用户函数(例如 ProcessFunction)的实现中,您不能拥有 DataStreamTable,或StreamExecutionEnvironment,或另一个ProcessFunction。您所能做的就是对传入的流记录做出反应,可以选择使用您在该函数内基于之前处理的记录建立的状态。

DataStreamTable API 围绕构建器范例进行组织,您可以使用该范例来描述流数据流管道。该管道必须是有向无环图:它可以拆分和合并,但必须从源流到接收器,没有任何循环。该管道的各个阶段(例如,ProcessFunction)必须编码为独立的块——它们无法到达自身外部来访问来自其他管道阶段的数据。

这种范例不太适合训练机器学习模型的目的(因为训练涉及迭代/循环)。如果这是您的目标,也许可以看看 https://github.com/apache/flink-ml< /a>.

There are a few things about this that aren't going to work:

In the implementation of any user function (such as a ProcessFunction) you cannot have a DataStream, or a Table, or a StreamExecutionEnvironment, or another ProcessFunction. All you can do is react to an incoming stream record, optionally using state you have built up inside that function based on the previously processed records.

The DataStream and Table APIs are organized around a builder paradigm, with which you are describing a streaming dataflow pipeline. This pipeline must be a directed acyclic graph: it can split and merge but must flow from sources to sinks without any loops. The stages of that pipeline (e.g., a ProcessFunction) must be coded as independent blocks -- they cannot reach outside of themselves to access data from other pipeline stages.

This paradigm isn't well suited for the purpose of training machine learning models (since training involves iterating/looping). If that's your objective, maybe take a look at https://github.com/apache/flink-ml.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文