我需要有多个事件中心消费组的多个水槽?

发布于 2025-01-23 23:00:59 字数 2290 浏览 5 评论 0原文

我正在从EventHub接收流数据,并且有4种来自EventHub的数据。

我正在数据集中群上的Event Hub读取数据,如:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_INSTANCE_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = 'ConsumerGroup_1'

spark_df = spark.readStream.format("eventhubs").options(**ehConf).load()

在消息属性中,我有消息类型,因此我使用SPARK将数据分开,然后在下面的功能上分开:

df_type_A = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeA")
df_type_B = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeB")
df_type_C = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeC")
df_type_D = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeD")

然后将数据写入不同的sink以下内容如下:

df_type_A.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_A/Checkpoint")\
    .option("path", "/mnt/type_A/Data")\
    .start()

df_type_B.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_B/Checkpoint")\
    .option("path", "/mnt/type_B/Data")\
    .start()

df_type_C.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_C/Checkpoint")\
    .option("path", "/mnt/type_C/Data")\
    .start()

df_type_D.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_D/Checkpoint")\
    .option("path", "/mnt/type_D/Data")\
    .start()

据我了解,Spark遵循懒惰的执行,对于多个水槽,它将创建4个不同的DAG图。微软说“建议每个消费者组的分区中只有一个活跃的接收器。” https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs/event-hubs-features

一切都正确,我需要在事件中心创建4个不同的消费者组,并为每种类型(a,b,c,d)编写单独的作业,或者一个消费者组就足够了吗?

如果创建多个消费者组是唯一的选择吗?是否可以避免它并有效执行相同的任务?

编辑:我尝试使用单个消费者组为我的4个作业使用,但它没有起作用。它将我的新接收器带有较高的epoch_id,而较高的时代为“ 0”,因此,创建了较高的时代,因此当前的接收器“ spark-driver-14”以epoch'0'而被断开连接。如果您要重新创建接收器,请确保使用更高的时代

I am receiving streaming data from eventhub and I am having 4 types of data coming from eventhub.

I am Reading data from event hub on my databricks clusters as :

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_INSTANCE_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = 'ConsumerGroup_1'

spark_df = spark.readStream.format("eventhubs").options(**ehConf).load()

In message property I have the type of message, so I am separating the data using the spark where function something like below:

df_type_A = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeA")
df_type_B = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeB")
df_type_C = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeC")
df_type_D = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeD")

And then writing the data to different sinks something like below:

df_type_A.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_A/Checkpoint")\
    .option("path", "/mnt/type_A/Data")\
    .start()

df_type_B.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_B/Checkpoint")\
    .option("path", "/mnt/type_B/Data")\
    .start()

df_type_C.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_C/Checkpoint")\
    .option("path", "/mnt/type_C/Data")\
    .start()

df_type_D.writeStream\
    .format("text")\
    .trigger(processingTime='10 seconds')\
    .option("checkpointLocation", "/mnt/type_D/Checkpoint")\
    .option("path", "/mnt/type_D/Data")\
    .start()

As I understand spark follows lazy execution and for multiple sink it will be create 4 different dag graphs. Microsoft says "it's recommended that there's only one active receiver on a partition per consumer group." (https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features)

To function everything properly, do I need to create 4 different consumer groups on event hub and write separate jobs for each Type(A, B, C, D) OR one consumer group is enough ?

If creating multiple consumer groups is the only option, is it possible to avoid it and do the same task efficiently ?

Edit : I tried using single consumer group for my 4 jobs but it didn't worked. It is throwing my New receiver with higher epoch_id with higher epoch of '0' is created hence current receiver 'spark-driver-14' with epoch '0' is getting disconnected. If you are recreating the receiver, make sure a higher epoch is used

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

拍不死你 2025-01-30 23:00:59

消费者组是相同数据的另一种视图。这不是一个概念,使您可以将消息路由到特定的消费者组。每个消费者组都处理所有分区的所有消息。我认为下图很好地显示了这一点:

”在此处输入映像说明“

(取自文档

,现在:

建议每个消费者组的分区上只有一个活动接收器

一个接收器会锁定它正在阅读的特定消费者组的分区。如果多个接收者正在从同一分区和消费者组中阅读,他们将争夺获取锁定。这是效率低下的,这就是为什么每个分区每个分区都有一个主动接收器的建议。

对我来说,您的方法似乎是有效的。不需要多个消费者群体。

A consumer group is another view over the same data. It is not a concept that allows you to route message to a specific consumer group. Each consumer group processes all the message of all partitions. I think the graph below shows this well:

enter image description here

(Taken from the docs)

Now, regarding this:

it's recommended that there's only one active receiver on a partition per consumer group

A receiver takes a lock on the partition it is reading for a specific consumer group. If multiple receivers are reading from the same partition and consumer group they will compete over acquiring the lock. That's inefficient and that is why it is advices to have a single active receiver per partition per consumer group.

To me, your approach seems valid as it is. No need for multiple consumer groups.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文