通过CSV文件,通过KAFKA API将数据从Azure Databricks流到Azure事件中心

发布于 2025-02-11 07:11:07 字数 1516 浏览 0 评论 0原文

我是Azure Databricks和Event Hubs的新手。我一直在努力使用SPARK和KAFKA API将数据从Databrick流传输到事件中心。我想要流的数据在.CSV文件中。流启动,但输入率的仪表板为空白。这是一个代码段:

def write_to_event_hub(df:DataFrame, topic: str, bootstrap_servers: str, config: str, checkpoint_path: str):
    print("Producing to even hub via Kafka")
    df.writeStream\
    .format("kafka")\
    .option("topic", topic)\
    .option("kafka.bootstrap.servers", bootstrap_servers)\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.security.protocol", "SASL_SSL")\
    .option("kafka.sasl.jaas.config", config)\
    .option("checkpointLocation", checkpoint_path)\
    .start()
    


write_to_event_hub(streaming_df, topic, bootstrap_servers, sasl_jaas_config, "./checkpoint")

和用于生成数据的代码:

streaming_df = spark.readStream.option("header", "true").schema(location_schema).csv(f"{path}").select("*")

我还附加了带有配置的图片。主题是事件中心的名称,连接字符串的格式:< endpoint = sb://xxxx.servicebus.windows.windows.net/; sharedAccessKeyname = xxxxx; sharedaccesskey = xxxx&gt = xxxx&gt = xxxx&gt = xxxx&gt = xxxx&gt ;

(我想连接到名称空间中的一个事件中心) 我阅读要流或配置的数据的方式可能有问题。有什么想法吗?

谢谢!

I am new to Azure Databricks and Event Hubs. I have been struggling for days to stream data from Databricks using Spark and Kafka API to an event hub.The data I want to stream is in a .csv file. The stream is starting but the Dashboard with the Input Rate is blank. Here is a code snippet:

def write_to_event_hub(df:DataFrame, topic: str, bootstrap_servers: str, config: str, checkpoint_path: str):
    print("Producing to even hub via Kafka")
    df.writeStream\
    .format("kafka")\
    .option("topic", topic)\
    .option("kafka.bootstrap.servers", bootstrap_servers)\
    .option("kafka.sasl.mechanism", "PLAIN")\
    .option("kafka.security.protocol", "SASL_SSL")\
    .option("kafka.sasl.jaas.config", config)\
    .option("checkpointLocation", checkpoint_path)\
    .start()
    


write_to_event_hub(streaming_df, topic, bootstrap_servers, sasl_jaas_config, "./checkpoint")

And the code used to generated data:

streaming_df = spark.readStream.option("header", "true").schema(location_schema).csv(f"{path}").select("*")

And I also attached a picture with the configuration. The topic is the name of the event hub, and the connection string has the format: <Endpoint=sb://XXXX.servicebus.windows.net/;SharedAccessKeyName=XXXX;SharedAccessKey=XXX=;EntityPath=XXXX>

(I want to connect to one event hub in the namespace)
Probably something is wrong with the way I read the data I want to stream or with the configuration. Any idea?

enter image description here
enter image description here

Thank you!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文