火花结构化流(批处理模式) - 同时运行依赖作业
我有一个在GCP DataProc群集上运行的结构化流媒体程序,该程序每10分钟读取KAFKA的数据,然后进行处理。 这是一个多租户系统,即该程序将读取来自多个客户的数据。
在我当前的代码中,我正在浏览客户将其传递到3个程序-P1,P2,P3 P1,P2,P3是处理大部分处理的类,并且将数据推回
def convertToDictForEachBatch(df, batchId):
# code to change syslog to required format - this is not included here since it is not relevant to the issue
# loop over the customers, and call the classes P1, P2, P3 for each customer
# TBD : run the processes in aynchronous fashion/concurrently
for cust in hm.values():
# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)
# df_stream = data read from Kafka, this calls function convertToDictForEachBatch
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
.outputMode("append") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
上述代码中的Kafka,处理是顺序的。.我想在可能的范围内同时进行处理并发/异步
。我正在考虑:
使用asyncio
- 据我了解,这可能会改善性能,因为对于每个客户,它可能允许在3堂课中以异步方式进行处理
性能 - 如果有足够的执行者
在这里是相同的代码, 这应该允许多个客户(在特定类中)并发。 我认为这可能需要在P1,P2,P3 PLS中的每个类别中的每个类别中都需要做到这一点,
window = Window.partitionBy('cust')
all_DF = all_DF.repartition('cust').cache()
results = (
all_DF
.groupBy('cust')
.apply(<function>)
)
该建议是什么是实现这一目标的最佳方法?
蒂亚!
I've a Structured Streaming program running on GCP Dataproc cluster which reads data from Kafka every 10 mins, and then does processing.
This is a multi-tenant system i.e. the program will read data from multiple customers.
In my current code, i'm looping over the customers passing it to the 3 programs - P1, P2, P3
P1, P2, P3 are classes where the bulk of the processing happens, and the data is pushed back to kafka
def convertToDictForEachBatch(df, batchId):
# code to change syslog to required format - this is not included here since it is not relevant to the issue
# loop over the customers, and call the classes P1, P2, P3 for each customer
# TBD : run the processes in aynchronous fashion/concurrently
for cust in hm.values():
# tdict_ap - has data specific to P2, filter code is not shown
p1 = P1(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P2, filter code is not shown
p2 = P2(tdict_ap, spark, False, cust)
# tdict_ap - has data specific to P3, filter code is not shown
p3 = P3(tdict_ap, spark, False, cust)
# df_stream = data read from Kafka, this calls function convertToDictForEachBatch
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp", "topic").writeStream \
.outputMode("append") \
.trigger(processingTime='10 minutes') \
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
In the above code, the processing is sequential .. I would like to make the processing concurrent/asynchronous to the extent possible
Couple of options I'm considering :
Using asyncio
- from what i understand, this might improve the performance since for each customer - it might allow processing in 3 classes in asynchronous fashion
use repartition data dataframe by 'customer' + groupBy
- this should allow concurrency for the multiple customers (in a specific class) if there are sufficient executors
Here is the same code for this,
I think this might need to be done in each of the 3 classes P1, P2, P3
window = Window.partitionBy('cust')
all_DF = all_DF.repartition('cust').cache()
results = (
all_DF
.groupBy('cust')
.apply(<function>)
)
Pls advise on what is the best way to achieve this ?
tia!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论