如何使用Python的多处理功能来解析Pandas DataFrames
我想解析并采取大量熊猫数据框中存储的数据。我已经尝试使用多处理,尽管得到了“结果”,但我觉得自己做对了,我非常感谢理智检查。
本质上,代码采用PANDAS数据框架(DF),并将DF拆分为许多较小的数据范围(如果10个核心,则10个拆分)。然后,它启动多处理。程序并将较小的数据范围与目标(dothework)一起提供给每个过程实例。目标方法(dothework)忙于较小的传递数据框架,然后将数据框架操作的结果添加到队列中。稍后,当所有过程完成后,然后从队列中检索数据。
这是代码的修剪版本:
def doTheWork(df, someString, someData, queue):
newPatientsList = []
#do all the Pandas work in here on object df
#lots of Pandas and data manipulation here
#then add my results to the queue
queue.put(newPatientsList)
def startHere(self, df, numCores):
splitList = []
#split the pandas dataframe by the number of cores and add each
#as an element to the splitList
queue = multiprocessing.Queue()
print("...using " + str(numCores) + " cores.")
with multiprocessing.Manager() as manager:
workers = []
for i in range(numCores):
#build a new thread for each process and point to the doTheWork method
workers.append(multiprocessing.Process(
target=self.doTheWork,
args=(splitList[i], someString, someData, queue)))
#go through each worker and start their job
for worker in workers:
worker.start()
#go through the queue pulling out data and do stuff
newPatients = Patients()
for i in range(numCores):
patientList = queue.get()
for patient in patientList:
newPatients.addPatient(patient)
for worker in workers:
worker.join()
return newPatients
I want to parse and act on a huge amount of data stored in a Pandas dataframe. I've tried using multiprocessing and despite getting "results", I don't feel that I'm doing it right and I would very much appreciate a sanity check.
Essentially, the code takes a pandas dataframe (df) and splits the df into a number of smaller dataframes (if 10 cores then 10 splits). It then starts up multiprocessing.Process and supply one of the smaller dataframes to each Process instance along with the target (doTheWork). The target method (doTheWork), gets busy with the smaller passed-in dataframe and then adds the results of the dataframe manipulation to a queue. Later, when all processes are finished, data is then retrieved from the queue.
Here's a trimmed-down version of the code:
def doTheWork(df, someString, someData, queue):
newPatientsList = []
#do all the Pandas work in here on object df
#lots of Pandas and data manipulation here
#then add my results to the queue
queue.put(newPatientsList)
def startHere(self, df, numCores):
splitList = []
#split the pandas dataframe by the number of cores and add each
#as an element to the splitList
queue = multiprocessing.Queue()
print("...using " + str(numCores) + " cores.")
with multiprocessing.Manager() as manager:
workers = []
for i in range(numCores):
#build a new thread for each process and point to the doTheWork method
workers.append(multiprocessing.Process(
target=self.doTheWork,
args=(splitList[i], someString, someData, queue)))
#go through each worker and start their job
for worker in workers:
worker.start()
#go through the queue pulling out data and do stuff
newPatients = Patients()
for i in range(numCores):
patientList = queue.get()
for patient in patientList:
newPatients.addPatient(patient)
for worker in workers:
worker.join()
return newPatients
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论