解析JSON数据并将其存储为Scala Spark中的数据框
我正在做一个POC,用于使用kafka中的AVRO消息,并使用io.confluent.kafka.serializers.kafkaavrodeserialializer
使用架构注册表url进行io.confluent.kafka.serializers.kafkaavrodeserialializer。
这是消费者配置,
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
我想将JSON输出解析为文件或数据框架。我正在从多个主题中阅读,因此我要为每个主题创建一个不同的文件或数据框架。
消费者代码:
println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))
consumer.subscribe(topics.asJava)
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
try {
var sampleSet : GenericRecord = null
//while(true){
for(record <- records){
println(s"*****************************")
println(s"${record.value()} ")
//sampleSet = record.value()
/*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
writer.write(record.value().toString)
writer.close()*/
}
//}
}
finally consumer.close()
输出:
***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
我使用文件编写者将其写入.json文件中,但是我不太确信这种方法是正确还是错。
想将其存储在多个数据
=
hml_id | create_datetime | ------------------------------------------------------------------------------------------------- | hml_a_data | 我 |
---|---|---|---|---|
126906438888106000 | 中 | ------------------------------------------------------------------------------------------------------------------------------------ | 帧 | dataframe |
, | : | 例如 -------- | p | null |
---- | ---------- | -------- | ------ | -------- |
for different topics that I am consuming from.
火花版= 2.4.4 Scala版本= 2.11.12
I am doin a POC for consuming Avro messages from kafka and deserializing it using io.confluent.kafka.serializers.KafkaAvroDeserializer
with schema registry URL.
Here is the consumer config,
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
I want to parse the Json output into a File or a dataframe. I am reading from multiple topics so I for each topic I want to create a different file or dataframe.
Consumer code:
println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))
consumer.subscribe(topics.asJava)
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
try {
var sampleSet : GenericRecord = null
//while(true){
for(record <- records){
println(s"*****************************")
println(s"${record.value()} ")
//sampleSet = record.value()
/*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
writer.write(record.value().toString)
writer.close()*/
}
//}
}
finally consumer.close()
Output:
***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
I used file writer to write it into a .json file, but I am not very confident whether this approach is correct or wrong.
I want to store it in multiple dataframes like
HML_A_DATA : Dataframe =
HML_ID | CREATE_DATETIME | ----------- | COV_ORDR_TYP_ID | COB_STRT_DT |
---|---|---|---|---|
\u0002cf· | 1269064388106000 | ---------- | P | null |
\u0002cf· | 1269064388106000 | ---------- | P | null |
---- | ---------- | -------- | ------ | -------- |
for different topics that I am consuming from.
spark Version = 2.4.4
scala Version = 2.11.12
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论