如何在Apache Beam中适当地为API呼叫创建会话
我正在为我的PCollection中的每个元素进行基本的API调用。在最初的实施中,我无需使用会话就打了电话,我的工作将大约9分钟30秒的时间用于1200行。总共320秒的API呼叫。我正在使用DataFlow Runner
来提高API调用的性能,我以以下方式使用Pardo中的设置方法创建了一个会话:
class textapi_call(beam.DoFn):
def __init__(self, api_key):
self.api_key = api_key
def setup(self):
self.session = requests.session()
def process(self, element):
address = element[3] + ", " + element[4] + ", " + element[5] + ", " + element[6] + ", " + element[7]
url = "https://maps.googleapis.com/maps/api/place/textsearch/json?query="
url += address
url += "&key={}".format(api_key)
params = {}
start = time.time()
res = self.session.get(url, params=params)
results = json.loads(res.content)
time_taken = time.time() - start
return [[element[0], address, str(results), time_taken]]
即使在创建会话后,作业仍然需要超过9分钟才能运行,但仍在320秒左右API调用。但是,与非课程代码相比,在直接跑步者中运行相同的代码的性能增长了2倍以上。
问题: 这是在Apache Beam中创建会话的适当方法吗? 我觉得在我的实施中,在工人节点中并未将会话保持在工作中。
样品输入
AGENT_ID,AGENT_NAME,DATE_OF_JOINING,ADDRESS_LINE1,ADDRESS_LINE2,CITY,STATE,POSTAL_CODE,EMP_ROUTING_NUMBER,EMP_ACCT_NUMBER
AGENT00001,Ray Johns,1993-06-05,1402 Maggies Way,,Waterbury Center,VT,05677,034584958,HKUN51252328472585
I am doing a basic API call for each element in my PCollection. In my initial implementation, I made the calls without using a session and my job took around 9mins 30sec for 1200 rows. 320 sec for API calls in total. I am using the Dataflow runner
To improve the performance of the API calls, I created a session using the setup method in ParDo in the following manner:
class textapi_call(beam.DoFn):
def __init__(self, api_key):
self.api_key = api_key
def setup(self):
self.session = requests.session()
def process(self, element):
address = element[3] + ", " + element[4] + ", " + element[5] + ", " + element[6] + ", " + element[7]
url = "https://maps.googleapis.com/maps/api/place/textsearch/json?query="
url += address
url += "&key={}".format(api_key)
params = {}
start = time.time()
res = self.session.get(url, params=params)
results = json.loads(res.content)
time_taken = time.time() - start
return [[element[0], address, str(results), time_taken]]
Even after creating a session the job still takes over 9mins to run and still around 320 secs for the API calls. But running the same code in a direct runner gave a performance gain of over 2 times compared to the non-session code.
Question:
Is this the appropriate way of creating a session in apache beam?
I feel like somehow the session is not being maintained in the worker nodes in my implementation.
sample Input
AGENT_ID,AGENT_NAME,DATE_OF_JOINING,ADDRESS_LINE1,ADDRESS_LINE2,CITY,STATE,POSTAL_CODE,EMP_ROUTING_NUMBER,EMP_ACCT_NUMBER
AGENT00001,Ray Johns,1993-06-05,1402 Maggies Way,,Waterbury Center,VT,05677,034584958,HKUN51252328472585
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
是的,这是在Apache Beam中初始化和共享会话的正确方法。有两个原因是您没有看到与直接跑步者相同的速度。
在数据流程上,可能有一个以上的工人(过程),每个工人都有自己的dofn(和设置调用),而对于直接跑步者(默认模式),总是有一个dofn和会话共享所有人。
(最有可能。)在数据流程中,还有许多其他重要的开销来源,例如启动VM,从(将数据写入)远程文件系统中读取数据,在混音中序列化和将数据从机器转移到机器/聚合等。对于这样的短期工作,可能会占主导地位与进行实际API调用。
Yes, this is the correct way of initializing and sharing a session in Apache Beam. There are a couple of reasons that you're not seeing the same speedup as you were with the direct runner.
On Dataflow, there may be more than one worker(process), each of which has its own DoFn (and setup call), whereas for the Direct runner (in its default mode) there is always exactly one DoFn and the session is shared for all.
(Most likely.) On Dataflow, there are many other significant sources of overhead, e.g. starting up VMs, reading data from (and writing data to) a remote filesystem, serializing and transferring data from machine to machine in a shuffle/aggregation, etc. For a short job like this, it is possible those dominate vs. doing the actual API calls.