使用Dask dataframe的scipy插值方法

发布于 2025-01-15 03:51:33 字数 9125 浏览 0 评论 0原文

我已经从某人的 GitHub 代码或 dask 问题中阅读了一堆 dask 示例。但在使用 Scipy 插值和 Dask 并行计算时仍然遇到问题,希望有人能帮助我解决它。

我实际上对如何扩展每个分区边界有疑问。请看我下面的描述,如果您不明白,请告诉我。

  1. 我的数据是非结构化的,不能使用数组
  2. 我的插值代码正在运行,但出现了一些奇怪的点,我敢打赌这是因为边缘效应。例如

实施例01:SRTM30至H3-rsol 12左侧面板是“dask”线性NDInterpolator 的输出,中间面板是原始数据集,右侧面板直接使用线性NDInterpolator。

示例 02 : Sentinal 2 到 H3-rsol 12左图是从原始数据集线性插值的散点图,而右图则使用 dask.dataframe [parallel] 的 dask 线性插值。

可以清楚地看到并行计算的结果没有清晰的形状,并且可能会在地图中看到一些奇怪的点。

这是我的代码 01:使用 dask.array。

def lNDIwrap(src_lon, src_lat, src_elv, tag_lon, tag_lat):
    return lNDI(list(zip(src_lon, src_lat)), src_elv)(tag_lon, tag_lat)
n_splits=96

#--- topodf is the topography dataset [pandas]. 
#--- df is the python h3 generated hexagon grid by topodf in resolution-12
dsrc = dd.from_pandas(topodf, npartitions=n_splits)
dtag = dd.from_pandas(df, npartitions=n_splits)

#--- Output the chunking dask array
slon,slat,data = dsrc.to_dask_array(lengths=True).T
tlon,tlat = dtag[['lon','lat']].to_dask_array(lengths=True).T

#--- Using **dask delayed** function to pass each partition into functions [lNDIwrap] 

gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

#--- Using **dask delayed** function to concat all partitions into one np.array
gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
results_lNDI = np.array(gd_lNDI.compute())

这是我的代码 02:使用 dask.dataframe。

def DDlNDIwrap(df, data_name='nir'):
    dtag = target_hexgrid(df, hex_res=12)
    slon, slat, data = df[['lon','lat',data_name]].values.T
    tlon, tlat = dtag[['lon','lat']].values.T
    tout = lNDI(list(zip(slon, slat)), data)(tlon, tlat)
    dout = pd.DataFrame(np.vstack([tlon,tlat,tout]).T,
                        columns=['lon','lat',data_name]
                       )
    return dd.from_pandas(dout, npartitions=1)
n_splits=96

#--- ds is the sentinel-2 satellite dataset. Reading from .til file.
gd_chunked_lNDI = [ delayed(DDlNDIwrap)(ds) for ds in dsrc.to_delayed()]
gd_lNDI = delayed(dd.concat)(gd_chunked_lNDI, axis=0)
gd = gd_lNDI.compute().compute()

我怀疑未知的模式即将出现在边缘/副作用上。我的意思是这些点可能位于每个分区的边缘,因此没有足够的数据点用于插值。我发现在Dask手册中我可能能够使用map_overlap、map_partitions和map_blocks来解决我的问题。但我一直失败。有人可以帮我解决这个问题吗?

PS:

以下是我使用 map_overlap 函数尝试的结果。

def maplNDIwrap(df, data_name='nir'):
    dtag = target_hexgrid(df, hex_res=12)
    slon, slat, data = df[['lon','lat',data_name]].values.T
    tlon, tlat = dtag[['lon','lat']].values.T
    tout = lNDI(list(zip(slon, slat)), data)(tlon, tlat)
    print(len(tlon), len(tout))
    
    dout = pd.DataFrame(np.vstack([tlon,tlat,tout]).T,
                        columns=['lon','lat',data_name]
                       )
    return dout

dtag = target_hexgrid(dsrc.compute(), hex_res=12) 
gd_map_lNDI = dsrc.map_overlap(maplNDIwrap,1,1, meta=type(dsrc))

print(len(dtag))          #--> output: 353678
gd_map_lNDI.compute()     #--> Not match expected size

打印消息

更新:

这里我定义了一些函数来生成合成数据集。

def lonlat(lon_min,lon_max, lat_min, lat_max, res=5):
    xps = round((lon_max - lon_min)*110*1e3/res)
    yps = round((lat_max - lat_min)*110*1e3/res)
    return np.meshgrid(np.linspace(lon_min,lon_max,xps),
                       np.linspace(lat_min,lat_max,yps)
                      )

def xy_based_map(x,y):
    x = np.pi*x/180
    y = np.pi*y/180
    return np.log10((1 - x / 3. + x ** 2 + (2*x*y) ** 3) * np.exp(-x ** 2 - y ** 2))

使用我上面提供的类似方法,结果如下。 您绝对可以看到插值输出中存在线条。

lon_min, lon_max, lat_min, lat_max = -70.42,-70.40,-30.42,-30.40

lon05, lat05 = lonlat(lon_min, lon_max, lat_min, lat_max,res=5)
z05 = xy_based_map(lon05,lat05)
df05= pd.DataFrame(np.vstack((lon05.ravel(), lat05.ravel(), z05.ravel())).T, columns=['lon','lat','z'])
df05= dd.from_pandas(df05, npartitions=n_splits)

lon30, lat30 = lonlat(lon_min, lon_max, lat_min, lat_max,res=30)
z30 = xy_based_map(lon30,lat30)
df30= pd.DataFrame(np.vstack((lon30.ravel(), lat30.ravel(), z30.ravel())).T, columns=['lon','lat','z'])
df30= dd.from_pandas(df30, npartitions=n_splits)
```. 
  
```python
tlon, tlat = df05[['lon','lat']].values.T
slon, slat, data = df10.values.T
gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_cNDI = [delayed(cNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_rNDI = [delayed(rNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
gd_cNDI = delayed(da.concatenate)(gd_chunked_cNDI, axis=0)
gd_rNDI = delayed(da.concatenate)(gd_chunked_rNDI, axis=0)

results_lNDI_10m = np.array(gd_lNDI.compute())
results_cNDI_10m = np.array(gd_cNDI.compute())
results_rNDI_10m = np.array(gd_rNDI.compute())

#--- No parallel computing
a,b,c,d,e = slon.compute(),slat.compute(),data.compute(),tlon.compute(),tlat.compute()
straight_lNDI_10m = lNDIwrap(a,b,c,d,e)
###--- 30m --> 5m
tlon, tlat = df05[['lon','lat']].values.T
slon, slat, data = df30.values.T
gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_cNDI = [delayed(cNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_rNDI = [delayed(rNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
gd_cNDI = delayed(da.concatenate)(gd_chunked_cNDI, axis=0)
gd_rNDI = delayed(da.concatenate)(gd_chunked_rNDI, axis=0)

results_lNDI_30m = np.array(gd_lNDI.compute())
results_cNDI_30m = np.array(gd_cNDI.compute())
results_rNDI_30m = np.array(gd_rNDI.compute())

###--- No Parallel for 30m --> 5m
a,b,c,d,e = slon.compute(),slat.compute(),data.compute(),tlon.compute(),tlat.compute()
straight_lNDI_30m = lNDIwrap(a,b,c,d,e)
###--- For plots.

dout= pd.DataFrame(np.vstack((tlon.compute(),tlat.compute(),df05.z.values.compute(),
                              results_lNDI_10m,results_cNDI_10m, results_rNDI_10m,straight_lNDI_10m,
                              results_lNDI_30m,results_cNDI_30m, results_rNDI_30m,straight_lNDI_30m,
                             )).T, columns=['lon','lat','orig','lNDI10','cNDI10','rNDI10','stgh10','lNDI30','cNDI30','rNDI30','stgh30'])

合成数据插值输出

I have read bunch of dask examples from either someone's GitHub code or the dask issues. But still have a problem of using Scipy interpolation with Dask parallel computing and hoping someone here can help me to solve it.

I actually have issue in how to expand each partition boundary. Please see my description below, and let me know if you cannot understand it.

  1. My data is unstructured and cannot using array
  2. My interpolation code is running, but there are some strange points occurring, I bet that is because of the edge effect. For example

Example 01: SRTM30 to H3-rsol 12
The left panel is the output from "dask" linearNDInterpolator, while the middle panel is the original dataset and the right panel is using linearNDInterpolator directly.

Example 02: Sentinal 2 to H3-rsol 12
The left panel is a scatter plot that is linear interpolated from original dataset, while the right hand side one is using dask linearinterpolation by dask.dataframe [parallel].

You can clearly see that the parallel computing results has no clear shape, and may possible see some strange points within the map.

Here is my code 01: Using dask.array.

def lNDIwrap(src_lon, src_lat, src_elv, tag_lon, tag_lat):
    return lNDI(list(zip(src_lon, src_lat)), src_elv)(tag_lon, tag_lat)
n_splits=96

#--- topodf is the topography dataset [pandas]. 
#--- df is the python h3 generated hexagon grid by topodf in resolution-12
dsrc = dd.from_pandas(topodf, npartitions=n_splits)
dtag = dd.from_pandas(df, npartitions=n_splits)

#--- Output the chunking dask array
slon,slat,data = dsrc.to_dask_array(lengths=True).T
tlon,tlat = dtag[['lon','lat']].to_dask_array(lengths=True).T

#--- Using **dask delayed** function to pass each partition into functions [lNDIwrap] 

gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

#--- Using **dask delayed** function to concat all partitions into one np.array
gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
results_lNDI = np.array(gd_lNDI.compute())

Here is my code 02: Using dask.dataframe.

def DDlNDIwrap(df, data_name='nir'):
    dtag = target_hexgrid(df, hex_res=12)
    slon, slat, data = df[['lon','lat',data_name]].values.T
    tlon, tlat = dtag[['lon','lat']].values.T
    tout = lNDI(list(zip(slon, slat)), data)(tlon, tlat)
    dout = pd.DataFrame(np.vstack([tlon,tlat,tout]).T,
                        columns=['lon','lat',data_name]
                       )
    return dd.from_pandas(dout, npartitions=1)
n_splits=96

#--- ds is the sentinel-2 satellite dataset. Reading from .til file.
gd_chunked_lNDI = [ delayed(DDlNDIwrap)(ds) for ds in dsrc.to_delayed()]
gd_lNDI = delayed(dd.concat)(gd_chunked_lNDI, axis=0)
gd = gd_lNDI.compute().compute()

I suspected that unknown patterns are coming on the edge/side-effect. What I meant is that those points could be around the edge of each partition so that there have not enough data points for interpolation. I found that in Dask manual I could possibly be able to use map_overlap, map_partitions, and map_blocks to solve my question. But I keep failed it. Could someone help me to solve this?

PS:

Following is what I tried by using map_overlap function.

def maplNDIwrap(df, data_name='nir'):
    dtag = target_hexgrid(df, hex_res=12)
    slon, slat, data = df[['lon','lat',data_name]].values.T
    tlon, tlat = dtag[['lon','lat']].values.T
    tout = lNDI(list(zip(slon, slat)), data)(tlon, tlat)
    print(len(tlon), len(tout))
    
    dout = pd.DataFrame(np.vstack([tlon,tlat,tout]).T,
                        columns=['lon','lat',data_name]
                       )
    return dout

dtag = target_hexgrid(dsrc.compute(), hex_res=12) 
gd_map_lNDI = dsrc.map_overlap(maplNDIwrap,1,1, meta=type(dsrc))

print(len(dtag))          #--> output: 353678
gd_map_lNDI.compute()     #--> Not match expected size

printing message

Updates:

Here I defined few function to generate synthetic dataset.

def lonlat(lon_min,lon_max, lat_min, lat_max, res=5):
    xps = round((lon_max - lon_min)*110*1e3/res)
    yps = round((lat_max - lat_min)*110*1e3/res)
    return np.meshgrid(np.linspace(lon_min,lon_max,xps),
                       np.linspace(lat_min,lat_max,yps)
                      )

def xy_based_map(x,y):
    x = np.pi*x/180
    y = np.pi*y/180
    return np.log10((1 - x / 3. + x ** 2 + (2*x*y) ** 3) * np.exp(-x ** 2 - y ** 2))

Using the similar method I provided above with the result below.
You can definitely see there are lines in the interpolation outputs.

lon_min, lon_max, lat_min, lat_max = -70.42,-70.40,-30.42,-30.40

lon05, lat05 = lonlat(lon_min, lon_max, lat_min, lat_max,res=5)
z05 = xy_based_map(lon05,lat05)
df05= pd.DataFrame(np.vstack((lon05.ravel(), lat05.ravel(), z05.ravel())).T, columns=['lon','lat','z'])
df05= dd.from_pandas(df05, npartitions=n_splits)

lon30, lat30 = lonlat(lon_min, lon_max, lat_min, lat_max,res=30)
z30 = xy_based_map(lon30,lat30)
df30= pd.DataFrame(np.vstack((lon30.ravel(), lat30.ravel(), z30.ravel())).T, columns=['lon','lat','z'])
df30= dd.from_pandas(df30, npartitions=n_splits)
```. 
  
```python
tlon, tlat = df05[['lon','lat']].values.T
slon, slat, data = df10.values.T
gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_cNDI = [delayed(cNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_rNDI = [delayed(rNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
gd_cNDI = delayed(da.concatenate)(gd_chunked_cNDI, axis=0)
gd_rNDI = delayed(da.concatenate)(gd_chunked_rNDI, axis=0)

results_lNDI_10m = np.array(gd_lNDI.compute())
results_cNDI_10m = np.array(gd_cNDI.compute())
results_rNDI_10m = np.array(gd_rNDI.compute())

#--- No parallel computing
a,b,c,d,e = slon.compute(),slat.compute(),data.compute(),tlon.compute(),tlat.compute()
straight_lNDI_10m = lNDIwrap(a,b,c,d,e)
###--- 30m --> 5m
tlon, tlat = df05[['lon','lat']].values.T
slon, slat, data = df30.values.T
gd_chunked_lNDI = [delayed(lNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_cNDI = [delayed(cNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_chunked_rNDI = [delayed(rNDIwrap)(x1, y1, newarr, xx, yy) for x1, y1, newarr, xx, yy in \
                   zip(slon.to_delayed().flatten(),
                       slat.to_delayed().flatten(),
                       data.to_delayed().flatten(),
                       tlon.to_delayed().flatten(),
                       tlat.to_delayed().flatten())]

gd_lNDI = delayed(da.concatenate)(gd_chunked_lNDI, axis=0)
gd_cNDI = delayed(da.concatenate)(gd_chunked_cNDI, axis=0)
gd_rNDI = delayed(da.concatenate)(gd_chunked_rNDI, axis=0)

results_lNDI_30m = np.array(gd_lNDI.compute())
results_cNDI_30m = np.array(gd_cNDI.compute())
results_rNDI_30m = np.array(gd_rNDI.compute())

###--- No Parallel for 30m --> 5m
a,b,c,d,e = slon.compute(),slat.compute(),data.compute(),tlon.compute(),tlat.compute()
straight_lNDI_30m = lNDIwrap(a,b,c,d,e)
###--- For plots.

dout= pd.DataFrame(np.vstack((tlon.compute(),tlat.compute(),df05.z.values.compute(),
                              results_lNDI_10m,results_cNDI_10m, results_rNDI_10m,straight_lNDI_10m,
                              results_lNDI_30m,results_cNDI_30m, results_rNDI_30m,straight_lNDI_30m,
                             )).T, columns=['lon','lat','orig','lNDI10','cNDI10','rNDI10','stgh10','lNDI30','cNDI30','rNDI30','stgh30'])

Synthetic Data Interp-Outputs

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文