Flink getResult()的聚合功能可以更改累加器值?
getResult()函数可以更改累加器的值吗?在累积时仍会有效应值。 代码在下面:
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public AverageAccumulator add(Datum value, AverageAccumulator acc) {
acc.count += value.getWeight();
acc.sum += value.getValue();
return acc;
}
public Double getResult(AverageAccumulator acc) {
int result = acc.sum / (double) acc.count;
acc.count = 0; //here
acc.sum = 0; //here
return result;
}
}
Can the getResult() function change the value of the accumulator?It still takes effect value when accumulating.
The code is in below:
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public AverageAccumulator add(Datum value, AverageAccumulator acc) {
acc.count += value.getWeight();
acc.sum += value.getValue();
return acc;
}
public Double getResult(AverageAccumulator acc) {
int result = acc.sum / (double) acc.count;
acc.count = 0; //here
acc.sum = 0; //here
return result;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不,您不能在
getResult
期间修改累加器。Flink将根据需要创建新的蓄能器(例如,对于每个新窗口)。
No, you cannot modify the accumulator during
getResult
.Flink will create new accumulators as necessary (e.g., for each new window).