springboot 整合 flink 处理Kafka数据 动态写入HDFS不同文件上

发布于 2022-09-12 23:00:03 字数 6938 浏览 27 评论 0

package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {

private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";

private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {

 //   System.out.println(proYml.getAutoCommitInterval());

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {

        @Override

public void flatMap(String s, Collector<String> collector) throws Exception {

            AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);

List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {

                if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
                    AllocationAndRecovery eventMessage = e.getEventMessage();

final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}

            }
        }
    })
 .addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));

}
}

package cn.nucarf.tianyan.service.dwd;
import cn.nucarf.tianyan.config.AB;
import cn.nucarf.tianyan.config.ProYml;
import cn.nucarf.tianyan.config.SinkHDFS;
import cn.nucarf.tianyan.pojo.bean.AllocationBean;
import cn.nucarf.tianyan.pojo.event.AllocationEvent;
import cn.nucarf.tianyan.pojo.eventmessage.AllocationAndRecovery;
import cn.nucarf.tianyan.service.dwd.Impl.StreamInputImpl;
import cn.nucarf.tianyan.utill.DateU;
import com.alibaba.fastjson.JSON;
import org.apache.calcite.util.Static;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
转账 分配记录 分配 回收表 */
@Service
public class DwdAllocation implements StreamInputImpl,Serializable {

private static final String kafkaTopic = "WJY_ALLO_RECYC_LOG";

private static String CREATE_GMT ="2020-05-03";
private static final Long TIME = 1617206400000L;
@Autowired
SinkHDFS sinkHDFS;
@Autowired
ProYml proYml;
@Override
public void DsPro(StreamExecutionEnvironment env, Properties properties) {

 //   System.out.println(proYml.getAutoCommitInterval());

DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<>(kafkaTopic, new SimpleStringSchema(), properties));
stream.flatMap(new FlatMapFunction<String, String>() {

        @Override

public void flatMap(String s, Collector<String> collector) throws Exception {

            AllocationBean allocationBean = JSON.parseObject(s, AllocationBean.class);

List<AllocationEvent> event = allocationBean.getEvent();
for (AllocationEvent e : event) {

                if (e.getEventTime() > TIME || e.getEventTime() < TIME && e.getEventName().equals("his_data_json")) {
                    AllocationAndRecovery eventMessage = e.getEventMessage();

final String date = DateU.getDate(eventMessage.getCreateGmt());
CREATE_GMT = date;
String line = JSON.toJSONString(eventMessage);
String arr = JSON.parseObject(line).values().toString();
collector.collect(arr);
}

            }
        }
    })
 .addSink(sinkHDFS.sinkHDFS(proYml.getDwdAlloRecycRecord(),CREATE_GMT));

}
}

题目描述

题目来源及自己的思路

相关代码

粘贴代码文本(请勿用截图)

你期待的结果是什么?实际看到的错误信息又是什么?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文