Flink getResult()的聚合功能可以更改累加器值?
getResult()函数可以更改累加器的值吗?在累积时仍会有效应值。 代码在下面:
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public AverageAccumulator add(Datum value, AverageAccumulator acc) {
acc.count += value.getWeight();
acc.sum += value.getValue();
return acc;
}
public Double getResult(AverageAccumulator acc) {
int result = acc.sum / (double) acc.count;
acc.count = 0; //here
acc.sum = 0; //here
return result;
}
}
Can the getResult() function change the value of the accumulator?It still takes effect value when accumulating.
The code is in below:
public class WeightedAverage implements AggregateFunction<Datum, AverageAccumulator, Double> {
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count += b.count;
a.sum += b.sum;
return a;
}
public AverageAccumulator add(Datum value, AverageAccumulator acc) {
acc.count += value.getWeight();
acc.sum += value.getValue();
return acc;
}
public Double getResult(AverageAccumulator acc) {
int result = acc.sum / (double) acc.count;
acc.count = 0; //here
acc.sum = 0; //here
return result;
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
不,您不能在
getResult
期间修改累加器。Flink将根据需要创建新的蓄能器(例如,对于每个新窗口)。
No, you cannot modify the accumulator during
getResult
.Flink will create new accumulators as necessary (e.g., for each new window).