使用“ dask”填充在同类中存储在同类中的boost_histograms。
我有一个dask
- boost_histogram
问题。我的代码结构如下:
我在某些脚本中定义了一个类:
class MyHist:
def __init__(....):
self.bh = None
def make_hist(...):
axis = bh.axis.Regular(....)
@dask.delayed
def fill_hist(data)
self.bh.fill(data)
在另一个脚本中,我想与Dask并行填充多个直方图。数据是尴尬
我从输入中读取的数组,为此,我会做类似的事情:
from dask.distributed import Client
cl = Client()
histos = [MyHist(..), MyHist(another...)]
for i, file in enumerate(files):
data = dask.delayed(open_file(file))
for myhist in histos:
if i ==0: myhist.make_hist()
fill_results.append(myhist.fill_hist(data)
dask.compute(*fill_results)
如果我尝试调用
for j, h in enumerate(histos):
print(h.bh)
我会获得空直方图。但是,如果我在fill_hist
funciton内部打印升压直方图,则直方图似乎已填充。
dask
计算是否会创建一个深层副本或my hist
对象进行计算,从而填充与该副本关联的bh
?还是我在这里做错了什么?
=============================================== =================== 更新:
与使用顺序代码相比,使用dask读取和填充时,我会看到类似或更糟糕的壁时间。无论我是否使用我的代码还是建议的答案,都是这种情况。对于不使用中级类的示例,我编写了以下代码:
files = get_input_file_paths('myprocess')
@dask.delayed
def make_a_var(jet_pt):
jets_pt = copy(jet_pt)
jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
return jets_pt[:, 0]*1e-3
@dask.delayed
def make_and_fill(data, axes):
h = bh.Histogram(*axes, storage=bh.storage.Weight())
h.fill(data)
return h
batch_size = 4
results = []
for i in range(0, len(files), batch_size):
batch = []
for j, file in enumerate(files[i:i+batch_size]):
data = dask.delayed(read_file(file))
var = data['jet_pt']
new_var = make_a_var(var)
new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
new_var= new_var.compressed()
for k in range(10):
axes = (bh.axis.Regular(25, 0, 250), )
h = make_and_fill(new_var, axes)
batch.append(h)
results.append(batch)
dask.compute(*results)
顺序运行此代码以及与dask一起运行此代码,对于k,在范围内( 10)
。对于k,范围(100)
并行代码需要15s,顺序需要21秒,这不像我想象的那样大的改进。
I have an dask
-boost_histogram
question. I have a code structure as follows:
I have a class defined in some script:
class MyHist:
def __init__(....):
self.bh = None
def make_hist(...):
axis = bh.axis.Regular(....)
@dask.delayed
def fill_hist(data)
self.bh.fill(data)
and in another script I want to fill multiple histograms in parallel with dask. The data are awkward
arrays that I read from input, and for that I do something like:
from dask.distributed import Client
cl = Client()
histos = [MyHist(..), MyHist(another...)]
for i, file in enumerate(files):
data = dask.delayed(open_file(file))
for myhist in histos:
if i ==0: myhist.make_hist()
fill_results.append(myhist.fill_hist(data)
dask.compute(*fill_results)
If I then try to call
for j, h in enumerate(histos):
print(h.bh)
I get empty histograms. However, if I print the boost histogram inside the fill_hist
funciton, the histograms seem to be filled.
Does the dask
computation create a deep copy or something of the MyHist
object to perform the computation, and hence fill the bh
associated with that copy? or am I doing something wrong here?
=====================================================================
Update:
I see a similar or worse wall-time when using dask to read and fill than using sequential code. This is the case whether or not I use my code or the suggested answer. For an example that doesn't use an intermediate class, I've written the following code:
files = get_input_file_paths('myprocess')
@dask.delayed
def make_a_var(jet_pt):
jets_pt = copy(jet_pt)
jets_pt = ak.mask(jets_pt, ak.count(jets_pt, axis=1)>=1)
return jets_pt[:, 0]*1e-3
@dask.delayed
def make_and_fill(data, axes):
h = bh.Histogram(*axes, storage=bh.storage.Weight())
h.fill(data)
return h
batch_size = 4
results = []
for i in range(0, len(files), batch_size):
batch = []
for j, file in enumerate(files[i:i+batch_size]):
data = dask.delayed(read_file(file))
var = data['jet_pt']
new_var = make_a_var(var)
new_var = new_pt.to_numpy() # Needed bc bh breaks for masked ak arrays
new_var= new_var.compressed()
for k in range(10):
axes = (bh.axis.Regular(25, 0, 250), )
h = make_and_fill(new_var, axes)
batch.append(h)
results.append(batch)
dask.compute(*results)
It takes a similar amount of wall-time ~7s to run this code sequentially as well as with dask, for k in range(10)
. For k in range(100)
the parallel code takes 15s and sequential takes 21s, which is not as big of an improvement as I would have thought.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
我相信吉姆的评论是正确的问题。我还将提供一个解决方案,我认为可能有助于解决问题:
我认为您的班级的定义使得很难与
dask
正确使用;也就是说,如果您的fill_hist
方法实际上是一个免费功能,则可能会有更轻松的时间。在您的循环中,您实际上是在dask.dask.delayed
在已经延迟方法上(这可能不是您想做的):我的建议是使用一个免费功能:
此模式与在其后端工作(和
dask-histragr
都支持dask-awkward
!)I believe Jim's comment is correct w.r.t. the source of the problem; I'll also offer a solution I think may be helpful in solving the problem:
I think the definition of your class makes it difficult to work correctly with
dask
; that is, you probably will have an easier time if yourfill_hist
method was actually a free function. And in your loop you are actually callingdask.delayed
on an alreadydelayed
method (this is likely not what you want to do):My suggestion would be to go with a free function:
This pattern is very similar to how
dask-histogram
works on its backend, (anddask-histogram
has support fordask-awkward
!)