使用“ dask”填充在同类中存储在同类中的boost_histograms。

发布于 2025-01-21 06:27:30 字数 2274 浏览 0 评论 0原文

我有一个dask - boost_histogram问题。我的代码结构如下:

我在某些脚本中定义了一个类:

class MyHist:
     def __init__(....):
         self.bh = None
     def make_hist(...):
           axis = bh.axis.Regular(....)
     @dask.delayed
     def fill_hist(data)
         self.bh.fill(data)

在另一个脚本中,我想与Dask并行填充多个直方图。数据是尴尬我从输入中读取的数组,为此,我会做类似的事情:

     from dask.distributed import Client
     cl = Client()
     histos = [MyHist(..), MyHist(another...)]
     for i, file in enumerate(files):
         data = dask.delayed(open_file(file))
         for myhist in histos:
             if i ==0:  myhist.make_hist()
             fill_results.append(myhist.fill_hist(data)
      dask.compute(*fill_results)

如果我尝试调用

for j, h in enumerate(histos):
        print(h.bh) 

我会获得空直方图。但是,如果我在fill_hist funciton内部打印升压直方图,则直方图似乎已填充。

dask计算是否会创建一个深层副本或my hist对象进行计算,从而填充与该副本关联的bh?还是我在这里做错了什么?

=============================================== =================== 更新:

与使用顺序代码相比,使用dask读取和填充时,我会看到类似或更糟糕的壁时间。无论我是否使用我的代码还是建议的答案,都是这种情况。对于不使用中级类的示例,我编写了以下代码:

files = get_input_file_paths('myprocess')

@dask.delayed
def make_a_var(jet_pt):
    jets_pt = copy(jet_pt)
    jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
    return jets_pt[:, 0]*1e-3

@dask.delayed
def make_and_fill(data, axes):
    h = bh.Histogram(*axes, storage=bh.storage.Weight())
    h.fill(data)
    return h 

batch_size = 4
results = []
for i in range(0, len(files), batch_size):
    batch = []
    for j, file in enumerate(files[i:i+batch_size]):
        data = dask.delayed(read_file(file))
        var = data['jet_pt']
        new_var = make_a_var(var)
        new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
        new_var= new_var.compressed()
        for k in range(10):
            axes = (bh.axis.Regular(25, 0, 250), )
            h = make_and_fill(new_var, axes)
            batch.append(h)
    results.append(batch)
dask.compute(*results)

顺序运行此代码以及与dask一起运行此代码,对于k,在范围内( 10)。对于k,范围(100)并行代码需要15s,顺序需要21秒,这不像我想象的那样大的改进。

I have an dask -boost_histogram question. I have a code structure as follows:

I have a class defined in some script:

class MyHist:
     def __init__(....):
         self.bh = None
     def make_hist(...):
           axis = bh.axis.Regular(....)
     @dask.delayed
     def fill_hist(data)
         self.bh.fill(data)

and in another script I want to fill multiple histograms in parallel with dask. The data are awkward arrays that I read from input, and for that I do something like:

     from dask.distributed import Client
     cl = Client()
     histos = [MyHist(..), MyHist(another...)]
     for i, file in enumerate(files):
         data = dask.delayed(open_file(file))
         for myhist in histos:
             if i ==0:  myhist.make_hist()
             fill_results.append(myhist.fill_hist(data)
      dask.compute(*fill_results)

If I then try to call

for j, h in enumerate(histos):
        print(h.bh) 

I get empty histograms. However, if I print the boost histogram inside the fill_hist funciton, the histograms seem to be filled.

Does the daskcomputation create a deep copy or something of the MyHist object to perform the computation, and hence fill the bh associated with that copy? or am I doing something wrong here?

=====================================================================
Update:

I see a similar or worse wall-time when using dask to read and fill than using sequential code. This is the case whether or not I use my code or the suggested answer. For an example that doesn't use an intermediate class, I've written the following code:

files = get_input_file_paths('myprocess')

@dask.delayed
def make_a_var(jet_pt):
    jets_pt = copy(jet_pt)
    jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
    return jets_pt[:, 0]*1e-3

@dask.delayed
def make_and_fill(data, axes):
    h = bh.Histogram(*axes, storage=bh.storage.Weight())
    h.fill(data)
    return h 

batch_size = 4
results = []
for i in range(0, len(files), batch_size):
    batch = []
    for j, file in enumerate(files[i:i+batch_size]):
        data = dask.delayed(read_file(file))
        var = data['jet_pt']
        new_var = make_a_var(var)
        new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
        new_var= new_var.compressed()
        for k in range(10):
            axes = (bh.axis.Regular(25, 0, 250), )
            h = make_and_fill(new_var, axes)
            batch.append(h)
    results.append(batch)
dask.compute(*results)

It takes a similar amount of wall-time ~7s to run this code sequentially as well as with dask, for k in range(10). For k in range(100) the parallel code takes 15s and sequential takes 21s, which is not as big of an improvement as I would have thought.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

因为看清所以看轻 2025-01-28 06:27:30

我相信吉姆的评论是正确的问题。我还将提供一个解决方案,我认为可能有助于解决问题:

我认为您的班级的定义使得很难与dask正确使用;也就是说,如果您的fill_hist方法实际上是一个免费功能,则可能会有更轻松的时间。在您的循环中,您实际上是在dask.dask.delayed在已经延迟方法上(这可能不是您想做的):

fill_results.append(dask.delayed(myhist.fill_hist(data))
#                                       ^^^^^^^^^
#                                 already delayed method

我的建议是使用一个免费功能:

@dask.delayed
def fill_hist(data, axes, storage=None):
    storage = storage or bh.storage.Double()
    h = bh.Histogram(*axes, storage=storage)
    h.fill(data)
    return h

@dask.delayed
def open_file(fname):
    data = some_function_to_get_data(fname)
    return data

axes = (bh.axis.Regular(100, -10, 10),)  # tuple with a single axis
tasks = []
for f in files:
    data = open_file(f)
    hist = fill_hist(data=data, axes=axes)
    tasks.append(hist)

results = dask.compute(tasks)

此模式与在其后端工作(和dask-histragr都支持dask-awkward!)

I believe Jim's comment is correct w.r.t. the source of the problem; I'll also offer a solution I think may be helpful in solving the problem:

I think the definition of your class makes it difficult to work correctly with dask; that is, you probably will have an easier time if your fill_hist method was actually a free function. And in your loop you are actually calling dask.delayed on an already delayed method (this is likely not what you want to do):

fill_results.append(dask.delayed(myhist.fill_hist(data))
#                                       ^^^^^^^^^
#                                 already delayed method

My suggestion would be to go with a free function:

@dask.delayed
def fill_hist(data, axes, storage=None):
    storage = storage or bh.storage.Double()
    h = bh.Histogram(*axes, storage=storage)
    h.fill(data)
    return h

@dask.delayed
def open_file(fname):
    data = some_function_to_get_data(fname)
    return data

axes = (bh.axis.Regular(100, -10, 10),)  # tuple with a single axis
tasks = []
for f in files:
    data = open_file(f)
    hist = fill_hist(data=data, axes=axes)
    tasks.append(hist)

results = dask.compute(tasks)

This pattern is very similar to how dask-histogram works on its backend, (and dask-histogram has support for dask-awkward!)

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文