级联 - 合并 2 个聚合
我有以下问题,我试图通过级联解决: 我有 csv 记录文件,其结构为:o,a,f,i,c
我需要按 o,a,f 聚合记录并对每组 i 和 c。
例如:
100,200,300,5,1
100,200,300,6,2
101,201,301,20,5
101,201,301,21,6
应该产生:
100,200,300,11,3
101,201,301,41,11
我不明白如何合并2我拥有的每个实例(我可以同时聚合两个字段吗?)。
你有什么想法吗?
哟西
public class CascMain {
public static void main(String[] args){
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Lfs(sourceScheme, "/tmp/casc/group.csv");
Scheme sinkScheme = new TextDelimited(new Fields("o", "a", "f", "ti", "tc"), ",");
Tap sink = new Lfs(sinkScheme, "/tmp/casc/output/", SinkMode.REPLACE);
Pipe assembly = new Pipe("agg-pipe");
Function function = new RegexSplitter(new Fields("o", "a", "f", "i", "c"), ",");
assembly = new Each(assembly, new Fields("line"), function);
Pipe groupAssembly = new GroupBy("group", assembly, new Fields("o", "a", "f"));
Sum impSum = new Sum(new Fields("ti"));
Pipe i = new Every(groupAssembly, new Fields("i"), impSum);
Sum clickSum = new Sum(new Fields("tc"));
Pipe c = new Every(groupAssembly, new Fields("c"), clickSum);
// WHAT SHOULD I DO HERE
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, CascMain.class);
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("agg", source, sink, assembly);
flow.complete();
}
}
I have the following problem whicj I am trying to solve with cascading: I have csv file of records with the structure: o,a,f,i,c
I need to to aggregate the records by o,a,f and to sum the i's and c's per group.
For example:
100,200,300,5,1
100,200,300,6,2
101,201,301,20,5
101,201,301,21,6
should yield:
100,200,300,11,3
101,201,301,41,11
I could not understand how to merge the 2 Every instances that I have (can I aggregate both fields in the same time?).
Do you have any idea?
Yosi
public class CascMain {
public static void main(String[] args){
Scheme sourceScheme = new TextLine(new Fields("line"));
Tap source = new Lfs(sourceScheme, "/tmp/casc/group.csv");
Scheme sinkScheme = new TextDelimited(new Fields("o", "a", "f", "ti", "tc"), ",");
Tap sink = new Lfs(sinkScheme, "/tmp/casc/output/", SinkMode.REPLACE);
Pipe assembly = new Pipe("agg-pipe");
Function function = new RegexSplitter(new Fields("o", "a", "f", "i", "c"), ",");
assembly = new Each(assembly, new Fields("line"), function);
Pipe groupAssembly = new GroupBy("group", assembly, new Fields("o", "a", "f"));
Sum impSum = new Sum(new Fields("ti"));
Pipe i = new Every(groupAssembly, new Fields("i"), impSum);
Sum clickSum = new Sum(new Fields("tc"));
Pipe c = new Every(groupAssembly, new Fields("c"), clickSum);
// WHAT SHOULD I DO HERE
Properties properties = new Properties();
FlowConnector.setApplicationJarClass(properties, CascMain.class);
FlowConnector flowConnector = new FlowConnector(properties);
Flow flow = flowConnector.connect("agg", source, sink, assembly);
flow.complete();
}
}
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
使用 AggregateBy 同时聚合多个字段:
Use AggregateBy to aggregate multiple fields at the same time: