如何使用Python的多处理功能来解析Pandas DataFrames

发布于 2025-02-13 16:51:08 字数 1595 浏览 1 评论 0原文

我想解析并采取大量熊猫数据框中存储的数据。我已经尝试使用多处理,尽管得到了“结果”,但我觉得自己做对了,我非常感谢理智检查。

本质上,代码采用PANDAS数据框架(DF),并将DF拆分为许多较小的数据范围(如果10个核心,则10个拆分)。然后,它启动多处理。程序并将较小的数据范围与目标(dothework)一起提供给每个过程实例。目标方法(dothework)忙于较小的传递数据框架,然后将数据框架操作的结果添加到队列中。稍后,当所有过程完成后,然后从队列中检索数据。

这是代码的修剪版本:

def doTheWork(df, someString, someData, queue):
    newPatientsList = []
    #do all the Pandas work in here on object df
    #lots of Pandas and data manipulation here
    #then add my results to the queue
    queue.put(newPatientsList)


def startHere(self, df, numCores):
    splitList = []
    #split the pandas dataframe by the number of cores and add each
    #as an element to the splitList

    queue = multiprocessing.Queue()
    print("...using " + str(numCores) + " cores.")

    with multiprocessing.Manager() as manager:
        workers = []
        for i in range(numCores):
            #build a new thread for each process and point to the doTheWork method
            workers.append(multiprocessing.Process(
                           target=self.doTheWork,
                           args=(splitList[i], someString, someData, queue)))

        #go through each worker and start their job
        for worker in workers:
            worker.start()

        #go through the queue pulling out data and do stuff
        newPatients = Patients()
        for i in range(numCores):
            patientList = queue.get()
            for patient in patientList:
                newPatients.addPatient(patient)

        for worker in workers:
            worker.join()

        return newPatients

I want to parse and act on a huge amount of data stored in a Pandas dataframe. I've tried using multiprocessing and despite getting "results", I don't feel that I'm doing it right and I would very much appreciate a sanity check.

Essentially, the code takes a pandas dataframe (df) and splits the df into a number of smaller dataframes (if 10 cores then 10 splits). It then starts up multiprocessing.Process and supply one of the smaller dataframes to each Process instance along with the target (doTheWork). The target method (doTheWork), gets busy with the smaller passed-in dataframe and then adds the results of the dataframe manipulation to a queue. Later, when all processes are finished, data is then retrieved from the queue.

Here's a trimmed-down version of the code:

def doTheWork(df, someString, someData, queue):
    newPatientsList = []
    #do all the Pandas work in here on object df
    #lots of Pandas and data manipulation here
    #then add my results to the queue
    queue.put(newPatientsList)


def startHere(self, df, numCores):
    splitList = []
    #split the pandas dataframe by the number of cores and add each
    #as an element to the splitList

    queue = multiprocessing.Queue()
    print("...using " + str(numCores) + " cores.")

    with multiprocessing.Manager() as manager:
        workers = []
        for i in range(numCores):
            #build a new thread for each process and point to the doTheWork method
            workers.append(multiprocessing.Process(
                           target=self.doTheWork,
                           args=(splitList[i], someString, someData, queue)))

        #go through each worker and start their job
        for worker in workers:
            worker.start()

        #go through the queue pulling out data and do stuff
        newPatients = Patients()
        for i in range(numCores):
            patientList = queue.get()
            for patient in patientList:
                newPatients.addPatient(patient)

        for worker in workers:
            worker.join()

        return newPatients

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文