springboot 整合 flink 处理Kafka数据 动态写入HDFS不同文件上
package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {
private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";
private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {
// System.out.println(proYml.getAutoCommitInterval());
DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);
List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {
if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
AllocationAndRecovery eventMessage = e.getEventMessage();
final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}
}
}
})
.addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));
}
}
package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {
private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";
private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {
// System.out.println(proYml.getAutoCommitInterval());
DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);
List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {
if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
AllocationAndRecovery eventMessage = e.getEventMessage();
final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}
}
}
})
.addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));
}
}
题目描述
题目来源及自己的思路
相关代码
粘贴代码文本(请勿用截图)
你期待的结果是什么?实际看到的错误信息又是什么?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论