解析JSON数据并将其存储为Scala Spark中的数据框

发布于 2025-02-13 12:40:20 字数 5940 浏览 0 评论 0原文

我正在做一个POC,用于使用kafka中的AVRO消息,并使用io.confluent.kafka.serializers.kafkaavrodeserialializer使用架构注册表url进行io.confluent.kafka.serializers.kafkaavrodeserialializer。

这是消费者配置,

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

我想将JSON输出解析为文件或数据框架。我正在从多个主题中阅读,因此我要为每个主题创建一个不同的文件或数据框架。

消费者代码:

println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))

consumer.subscribe(topics.asJava)

  
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
    
try {
  var sampleSet : GenericRecord = null
  //while(true){
      for(record <- records){
        println(s"*****************************")
        println(s"${record.value()} ")
        //sampleSet = record.value()
        /*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
        writer.write(record.value().toString)
        writer.close()*/
        
      }
    //}
}
finally consumer.close()

输出:

***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}

我使用文件编写者将其写入.json文件中,但是我不太确信这种方法是正确还是错。

想将其存储在多个数据

=

hml_idcreate_datetime-------------------------------------------------------------------------------------------------hml_a_data
126906438888106000------------------------------------------------------------------------------------------------------------------------------------dataframe
例如 --------pnull
------------------------------------

for different topics that I am consuming from.

火花版= 2.4.4 Scala版本= 2.11.12

I am doin a POC for consuming Avro messages from kafka and deserializing it using io.confluent.kafka.serializers.KafkaAvroDeserializer with schema registry URL.

Here is the consumer config,

props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, property.getProperty("bootstrap.servers"))
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer])
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

I want to parse the Json output into a File or a dataframe. I am reading from multiple topics so I for each topic I want to create a different file or dataframe.

Consumer code:

println("Creating Kafka consumer!!!")
val consumer = new KafkaConsumer(props)
val topics: List[String] = List(property.getProperty("kafka.consumer.topic.names"))

consumer.subscribe(topics.asJava)

  
val records = consumer.poll(Duration.ofMinutes(4)).asScala
//val samplelist = records.toList
    
try {
  var sampleSet : GenericRecord = null
  //while(true){
      for(record <- records){
        println(s"*****************************")
        println(s"${record.value()} ")
        //sampleSet = record.value()
        /*val writer = new BufferedWriter(new FileWriter(new File("s.json"),true))
        writer.write(record.value().toString)
        writer.close()*/
        
      }
    //}
}
finally consumer.close()

Output:

***********************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
************************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
****************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
*******************************
{"HML_A_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}
**************************
{"HML_C_DATA": {"HML_ID": "\u0002cf·", "CREAT_DATETIME": 1269064388106000, "CREAT_USER_ID": "SYSTEM_DATA_PROP", "CHG_DTTM": 1400715603961000, "CHG_USER_ID": "SYSTEM_DATA_PROP", "UPDT_VER_NBR": "\u0003", "HML_STS_RSN_TYP_ID": "7", "HML_STS_TYP_ID": "4", "SRVC_SETTING_TYP_ID": "1", "REV_PRR_TYP_ID": "1", "MBR_ID": "\u0001\tV", "MBR_COV_SEQ_NBR": 1, "COB_IND": 1, "COV_ORDR_TYP_ID": "P", "COB_STRT_DT": null}, "beforeData": null, "headers": {"operation": "REFRESH", "changeSequence": "", "timestamp": "", "streamPosition": "", "transactionId": "", "changeMask": null, "columnMask": null, "externalSchemaId": "75916", "transactionEventCounter": null, "transactionLastEvent": null}}

I used file writer to write it into a .json file, but I am not very confident whether this approach is correct or wrong.

I want to store it in multiple dataframes like

HML_A_DATA : Dataframe =

HML_IDCREATE_DATETIME-----------COV_ORDR_TYP_IDCOB_STRT_DT
\u0002cf·1269064388106000----------Pnull
\u0002cf·1269064388106000----------Pnull
------------------------------------

for different topics that I am consuming from.

spark Version = 2.4.4
scala Version = 2.11.12

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文