Python Pandas:分区尺寸小于重叠的窗口大小

发布于 01-21 22:32 字数 4489 浏览 0 评论 0 原文

我正在尝试使用 pandas 使用 switfer 来计算时间序列的斜率。

我的代码:

import os.path
from os import listdir
from os.path import isfile, join

import numpy as np
import pandas_ta
from scipy.stats import linregress

import pandas as pd
import swifter

FILE_PATH = "Data"


def get_files_ohlc(path: str):
    return [f for f in listdir(path) if isfile(join(path, f))]


def get_slope(array):
    y = np.array(array)
    x = np.arange(len(y))
    slope, intercept, r_value, p_value, std_err = linregress(x, y)
    return slope


def add_slop_indicator(ohlc: pd.DataFrame, ind: str, candles_back: int):
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
        get_slope, raw=True)


if __name__ == '__main__':
    files = get_files_ohlc(FILE_PATH)
    dicts = {}

    for file in files:
        name_pair = file.split("_")[0]
        dicts[name_pair] = pd.read_json(os.path.join(FILE_PATH, file))
        dicts[name_pair].rename({0: 'date',
                                 1: 'open',
                                 2: 'high',
                                 3: 'low',
                                 4: 'close',
                                 5: 'volume'}, axis=1, inplace=True)
        dicts[name_pair]['date'] = dicts[name_pair]['date'].values.astype(dtype='datetime64[ms]')
        for val in range(20, 100):
            dicts[name_pair].ta.ema(length=val, append=True)

        print(f"END_{name_pair}")
        for val in range(20, 100):
            for days in range(5, 100):
                add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
        print(f"DONE {name_pair}")

但是在运行一段时间后,我将收到此错误消息,并且程序将停止运行。

错误:

dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

更新:

我已经更新了代码,以使其成为一个简单的可行示例。

pandas_ta (用于 dicts [name_pair] .ta ) - >是这个将允许我使用ema(只是指示器

)可以在此 test.py 中运行代码,它将输出我要问的错误消息。

希望这会有所帮助,否则请让我知道需要澄清什么。

我的追溯:

Traceback (most recent call last):
  File "/home/vlad/Crypto_15m_data/test.py", line 51, in <module>
    add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
  File "/home/vlad/Crypto_15m_data/test.py", line 27, in add_slop_indicator
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 521, in apply
    return self._dask_apply(func, *args, **kwds)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 562, in _dask_apply
    dd.from_pandas(self._comparison_pd, npartitions=self._npartitions)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 292, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 575, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 220, in get
    result = get_async(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 508, in get_async
    raise_exception(exc, tb)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 110, in reraise
    raise exc
dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

Traceback
---------
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/dataframe/rolling.py", line 29, in overlap_chunk
    raise NotImplementedError(msg)


Process finished with exit code 1

I am trying to compute the slope of a time series using Pandas with Switfer by doing this.

My code:

import os.path
from os import listdir
from os.path import isfile, join

import numpy as np
import pandas_ta
from scipy.stats import linregress

import pandas as pd
import swifter

FILE_PATH = "Data"


def get_files_ohlc(path: str):
    return [f for f in listdir(path) if isfile(join(path, f))]


def get_slope(array):
    y = np.array(array)
    x = np.arange(len(y))
    slope, intercept, r_value, p_value, std_err = linregress(x, y)
    return slope


def add_slop_indicator(ohlc: pd.DataFrame, ind: str, candles_back: int):
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
        get_slope, raw=True)


if __name__ == '__main__':
    files = get_files_ohlc(FILE_PATH)
    dicts = {}

    for file in files:
        name_pair = file.split("_")[0]
        dicts[name_pair] = pd.read_json(os.path.join(FILE_PATH, file))
        dicts[name_pair].rename({0: 'date',
                                 1: 'open',
                                 2: 'high',
                                 3: 'low',
                                 4: 'close',
                                 5: 'volume'}, axis=1, inplace=True)
        dicts[name_pair]['date'] = dicts[name_pair]['date'].values.astype(dtype='datetime64[ms]')
        for val in range(20, 100):
            dicts[name_pair].ta.ema(length=val, append=True)

        print(f"END_{name_pair}")
        for val in range(20, 100):
            for days in range(5, 100):
                add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
        print(f"DONE {name_pair}")

But after running it for a while I will get this error message and the program will stop running.

Error:

dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

UPDATE:

I've updated my code to make it a simple workable example.

pandas_ta (is used for dicts[name_pair].ta) -> is this library which is going to allow me to use EMA (which is just an indicator)

All the code can be found in this repository that I've created for this question. If you will run the code from test.py it will output that error message that I am asking about.

Hope that this would help, otherwise please let me know what would needs to be clarified.

My traceback:

Traceback (most recent call last):
  File "/home/vlad/Crypto_15m_data/test.py", line 51, in <module>
    add_slop_indicator(dicts[name_pair], f"EMA_{val}", days)
  File "/home/vlad/Crypto_15m_data/test.py", line 27, in add_slop_indicator
    ohlc[f'slope_{val}_{days}'] = ohlc[ind].swifter.rolling(window=candles_back, min_periods=candles_back).apply(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 521, in apply
    return self._dask_apply(func, *args, **kwds)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/swifter/swifter.py", line 562, in _dask_apply
    dd.from_pandas(self._comparison_pd, npartitions=self._npartitions)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 292, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/base.py", line 575, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 220, in get
    result = get_async(
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 508, in get_async
    raise_exception(exc, tb)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/multiprocessing.py", line 110, in reraise
    raise exc
dask.multiprocessing.NotImplementedError: Partition size is less than overlapping window size. Try using ``df.repartition`` to increase the partition size.

Traceback
---------
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/local.py", line 221, in execute_task
    result = _execute_task(task, data)
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/home/vlad/Crypto_15m_data/venv/lib/python3.10/site-packages/dask/dataframe/rolling.py", line 29, in overlap_chunk
    raise NotImplementedError(msg)


Process finished with exit code 1

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文