火花结构化流(批处理模式) - 同时运行依赖作业

发布于 2025-02-07 16:57:26 字数 1933 浏览 3 评论 0原文

我有一个在GCP DataProc群集上运行的结构化流媒体程序,该程序每10分钟读取KAFKA的数据,然后进行处理。 这是一个多租户系统,即该程序将读取来自多个客户的数据。

在我当前的代码中,我正在浏览客户将其传递到3个程序-P1,P2,P3 P1,P2,P3是处理大部分处理的类,并且将数据推回


def convertToDictForEachBatch(df, batchId):

    # code to change syslog to required format - this is not  included here since it is not relevant to the issue
    
    # loop over the customers, and call the classes P1, P2, P3 for each customer
    # TBD : run the processes in aynchronous fashion/concurrently
    for cust in hm.values():
    
            # tdict_ap - has data specific to P2, filter code is not shown
            p1 = P1(tdict_ap, spark, False, cust)
            
            
            # tdict_ap - has data specific to P2, filter code is not shown
            p2 = P2(tdict_ap, spark, False, cust)
            
            
            # tdict_ap - has data specific to P3, filter code is not shown
            p3 = P3(tdict_ap, spark, False, cust)
            
            
                

# df_stream = data read from Kafka, this calls function convertToDictForEachBatch       

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
         .outputMode("append") \
         .trigger(processingTime='10 minutes') \
         .option("truncate", "false") \
         .option("checkpointLocation", checkpoint) \
         .foreachBatch(convertToDictForEachBatch) \
         .start()
        


上述代码中的Kafka,处理是顺序的。.我想在可能的范围内同时进行处理并发/异步

。我正在考虑:

  1. 使用asyncio
    - 据我了解,这可能会改善性能,因为对于每个客户,它可能允许在3堂课中以异步方式进行处理

性能 - 如果有足够的执行者

在这里是相同的代码, 这应该允许多个客户(在特定类中)并发。 我认为这可能需要在P1,P2,P3 PLS中的每个类别中的每个类别中都需要做到这一点,


window = Window.partitionBy('cust')

all_DF = all_DF.repartition('cust').cache()

results = (
    all_DF
    .groupBy('cust')
    .apply(<function>)
    )

该建议是什么是实现这一目标的最佳方法?

蒂亚!

I've a Structured Streaming program running on GCP Dataproc cluster which reads data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple customers.

In my current code, i'm looping over the customers passing it to the 3 programs - P1, P2, P3
P1, P2, P3 are classes where the bulk of the processing happens, and the data is pushed back to kafka


def convertToDictForEachBatch(df, batchId):

    # code to change syslog to required format - this is not  included here since it is not relevant to the issue
    
    # loop over the customers, and call the classes P1, P2, P3 for each customer
    # TBD : run the processes in aynchronous fashion/concurrently
    for cust in hm.values():
    
            # tdict_ap - has data specific to P2, filter code is not shown
            p1 = P1(tdict_ap, spark, False, cust)
            
            
            # tdict_ap - has data specific to P2, filter code is not shown
            p2 = P2(tdict_ap, spark, False, cust)
            
            
            # tdict_ap - has data specific to P3, filter code is not shown
            p3 = P3(tdict_ap, spark, False, cust)
            
            
                

# df_stream = data read from Kafka, this calls function convertToDictForEachBatch       

query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
         .outputMode("append") \
         .trigger(processingTime='10 minutes') \
         .option("truncate", "false") \
         .option("checkpointLocation", checkpoint) \
         .foreachBatch(convertToDictForEachBatch) \
         .start()
        


In the above code, the processing is sequential .. I would like to make the processing concurrent/asynchronous to the extent possible

Couple of options I'm considering :

  1. Using asyncio
    - from what i understand, this might improve the performance since for each customer - it might allow processing in 3 classes in asynchronous fashion

use repartition data dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers (in a specific class) if there are sufficient executors

Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3


window = Window.partitionBy('cust')

all_DF = all_DF.repartition('cust').cache()

results = (
    all_DF
    .groupBy('cust')
    .apply(<function>)
    )

Pls advise on what is the best way to achieve this ?

tia!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文