读取没有 schema 的 Json Kafka 消息

发布于 2025-01-10 02:54:03 字数 291 浏览 0 评论 0原文

目前我们正在开发包含 Json 数据的实时数据源。

在阅读以下示例时 - https://sparkbyexamples.com/spark/spark-streaming-with-kafka/< /a>

看起来我们需要一个 kafka json 消息的模式。

有没有其他方法可以在没有模式的情况下处理数据?

Currently we are working on a real time data feeds having Json data.

While reading the examples from -
https://sparkbyexamples.com/spark/spark-streaming-with-kafka/

It looks like we need a schema for kafka json message.

Is there any other way to process data without schema ?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

空城仅有旧梦在 2025-01-17 02:54:03

运行zookeeper、Kafka服务器和其他所需的服务后尝试下面的代码。

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest")\
        .load()  #earliest

print("Printing Schema of transaction_detail_df: ")
df.printSchema()

transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)")

trans_detail_write_stream = transaction_detail_df1 \
    .writeStream \
    .trigger(processingTime='2 seconds') \
    .option("truncate", "false") \
    .format("console") \
    .start()

trans_detail_write_stream.awaitTermination()

只需更改基本配置,您就可以看到输出

try below code after running the zookeeper, Kafka server and other required service.

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest")\
        .load()  #earliest

print("Printing Schema of transaction_detail_df: ")
df.printSchema()

transaction_detail_df1 = df.selectExpr("CAST(value AS STRING)")

trans_detail_write_stream = transaction_detail_df1 \
    .writeStream \
    .trigger(processingTime='2 seconds') \
    .option("truncate", "false") \
    .format("console") \
    .start()

trans_detail_write_stream.awaitTermination()

just change the basic configuration, you would be able to see the output

烟花肆意 2025-01-17 02:54:03

您可以使用 SparkSQL get_json_object 函数从 JSON 字符串数据中解析数据,而无需定义任何其他架构。

您可以简单地使用 cast 函数来反序列化二进制键/值,如示例所示

You can use get_json_object SparkSQL function to parse data out of JSON string data without defining any additional schema.

You can simply use cast function to deserialize the binary key/value, as the example shows

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文