为什么使用dask在for循环中不尊重该顺序?

发布于 2025-01-25 17:04:59 字数 3281 浏览 3 评论 0原文

为什么当我在下面的代码中运行循环时,dask更喜欢首先执行“四个”,然后再做“一个”,依此类推,而不是从第一个开始并结束最后一个元素? 例如,我是否有可能获得一些混合(错误的)结果,例如,它将一个文件/文件夹的内容放入另一个文件中?或者,如果前面有条件,它们被忽略等等?

提前致谢!

def compa(filename):
    filex=pd.read_json('folder/{}'.format(filename))    
    for jj in ['Zero', 'One', 'Two', 'Three','Four']:
        filexz=filex[filex[jj]==1].reset_index(drop=True)


        newpath = 'Newfolder/{}'.format(jj)
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        filexz.to_json('{}/{}'.format(newpath,filename))

delayed_results=[delayed(compa)(filename) for filename in filelist]
compute(*delayed_results, scheduler='processes')

用于复制目的的代码:

import pandas as pd
sof1=pd.DataFrame({'minus': ['a', 'b', 'c', 'd', 'e'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof2=pd.DataFrame({'minus': ['aa', 'bb', 'cc', 'dd', 'ee'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof3=pd.DataFrame({'minus': ['az', 'bz', 'cz', 'dz', 'ez'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof4=pd.DataFrame({'minus': ['azy', 'bzy', 'czy', 'dzy', 'ezy'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof5=pd.DataFrame({'minus': ['azx', 'bzx', 'czx', 'dzx', 'ezx'],'Zero': [1, 0, 0, 0, 0],'One': [0, 1, 0, 0, 0],'Two': [0, 0, 1, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof6=pd.DataFrame({'minus': ['azw', 'bzw', 'czw', 'dzw', 'ezw'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof7=pd.DataFrame({'minus': ['azyq', 'bzyq', 'czyq', 'dzyq', 'ezyq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof8=pd.DataFrame({'minus': ['azxq', 'bzxq', 'czxq', 'dzxq', 'ezxq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof9=pd.DataFrame({'minus': ['azwq', 'bzwq', 'czwq', 'dzwq', 'ezwq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})

filelist=[sof1,
sof2,
sof3,
sof4,
sof5,
sof6,
sof7,
sof8,
sof9]

import pandas as pd
import dask
from dask import compute, delayed
import os

def compa(filename):
    filex=filename
    for jj in ['Zero', 'One', 'Two', 'Three','Four']:
        filexz=filex[filex[jj]==1].reset_index(drop=True)
        newpath = 'Newfolderstackoverflow/{}'.format(jj)
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        filexz.to_json('{}/{}'.format(newpath,filename.loc[1,'minus']))

delayed_results=[delayed(compa)(filename) for filename in filelist]
compute(*delayed_results, scheduler='processes')

当上面的代码立即运行时,我不知道如何记录创建顺序,而是创建了第一个“四”和“一个”文件夹,然后将其剩下! 文件应该并行计算)

(并且每个文件夹中创建文件的顺序并不遵循filelist中的顺序,这对我来说是无法理解的,因为这些 @michaeldelgado这是如何解决的:我添加了60秒的睡眠,注意到60秒后它每次创建两个文件,并从文件夹零开始添加最多四个文件。我最初问题的原因是,由于最后几个文件在同一分钟内添加到5个文件夹中,因此根据时间对文件夹进行排序毫无意义,我的操作系统按字母顺序排序(因此“四个”,然后“一个”)

Why when I run a for-loop in the code below, dask prefers to firstly do the 'Four' then 'One', and so on instead of starting from the first and finishing with the last element?
Is it possible that I get some mixed (wrong) results where for example it puts the content of one file/folder into another? or if there are conditions within the for-loop they are ignored etc.?

Thanks in advance!

def compa(filename):
    filex=pd.read_json('folder/{}'.format(filename))    
    for jj in ['Zero', 'One', 'Two', 'Three','Four']:
        filexz=filex[filex[jj]==1].reset_index(drop=True)


        newpath = 'Newfolder/{}'.format(jj)
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        filexz.to_json('{}/{}'.format(newpath,filename))

delayed_results=[delayed(compa)(filename) for filename in filelist]
compute(*delayed_results, scheduler='processes')

Code for replication purposes:

import pandas as pd
sof1=pd.DataFrame({'minus': ['a', 'b', 'c', 'd', 'e'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof2=pd.DataFrame({'minus': ['aa', 'bb', 'cc', 'dd', 'ee'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof3=pd.DataFrame({'minus': ['az', 'bz', 'cz', 'dz', 'ez'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof4=pd.DataFrame({'minus': ['azy', 'bzy', 'czy', 'dzy', 'ezy'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof5=pd.DataFrame({'minus': ['azx', 'bzx', 'czx', 'dzx', 'ezx'],'Zero': [1, 0, 0, 0, 0],'One': [0, 1, 0, 0, 0],'Two': [0, 0, 1, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof6=pd.DataFrame({'minus': ['azw', 'bzw', 'czw', 'dzw', 'ezw'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof7=pd.DataFrame({'minus': ['azyq', 'bzyq', 'czyq', 'dzyq', 'ezyq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof8=pd.DataFrame({'minus': ['azxq', 'bzxq', 'czxq', 'dzxq', 'ezxq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})
sof9=pd.DataFrame({'minus': ['azwq', 'bzwq', 'czwq', 'dzwq', 'ezwq'],'Zero': [1, 0, 0, 0, 0],'One': [0, 0, 1, 0, 0],'Two': [0, 1, 0, 0, 0],'Three': [0, 0, 0, 0, 1],'Four': [0, 0, 0, 1, 0]})

filelist=[sof1,
sof2,
sof3,
sof4,
sof5,
sof6,
sof7,
sof8,
sof9]

import pandas as pd
import dask
from dask import compute, delayed
import os

def compa(filename):
    filex=filename
    for jj in ['Zero', 'One', 'Two', 'Three','Four']:
        filexz=filex[filex[jj]==1].reset_index(drop=True)
        newpath = 'Newfolderstackoverflow/{}'.format(jj)
        if not os.path.exists(newpath):
            os.makedirs(newpath)
        filexz.to_json('{}/{}'.format(newpath,filename.loc[1,'minus']))

delayed_results=[delayed(compa)(filename) for filename in filelist]
compute(*delayed_results, scheduler='processes')

As the code above runs immediately I don't know how to record the creation order but first "four" and "one" folders are created then the rest! (and the order of creation of the files within each folder does not follow the order in the filelist neither which is understandable to me as THOSE FILES are supposed to be computed in parallel)

Thanks to the comments and answers specially those of
@MichaelDelgado here is how it got solved: I added sleep for 60 seconds noticing that after 60sec it creates two files at each time and add it starting from folder Zero up to Four. The reason for my initial problem was that as the last couple of files were added within the same minute to the 5 folders, sorting folders based on time was meaningless and my OS sorted them alphabetically (hence "four" then "one")

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

通知家属抬走 2025-02-01 17:04:59

执行任务的顺序由几个因素

  • 用户指定优先事项;
  • FIFO订单;
  • 图结构。

关于混合的可能性,只要内部代码是正确的(因此没有多个过程同时写入同一文件),则不可能。正如@mdurant的评论中指出的那样,看起来您的循环多次将其写入同一文件。

The order in which tasks are executed is determined by several factors:

  • user-specified priorities;
  • FIFO order;
  • graph structure.

With regards to the possibility of a mix-up, as long as the internal code is correct (so no multiple processes writing to the same file at the same time), this should not be possible. As noted in the comment by @mdurant, it looks like your loop writes to the same file multiple times.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文