将SPARK DATAFRAME转换为DASK数据框

发布于 2025-01-18 16:25:50 字数 9328 浏览 4 评论 0原文

首先,我将名为 calendar 的 Spark df 编写为名为 cal 的 parquet 文件。

calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")

然后,我将其从 Hadoop 复制到我的个人文件夹。

!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset

最后,我尝试将镶木地板读入Dask。

import dask.dataframe as dd

df = dd.read_parquet('/home/vusal.babashov/dataset/cal')

这最后一步是我不断收到 OSError: [Errno 22] Invalid argument 错误的地方。路径是正确的,并且名为 cal 的镶木地板文件存在(我可以确认)。

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

当我运行 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow') 时,出现以下错误


RuntimeError                              Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    290 
    291     if isinstance(engine, str):
--> 292         engine = get_engine(engine)
    293 
    294     if hasattr(path, "name"):

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
    917 
    918         if pa_version < parse_version("0.13.1"):
--> 919             raise RuntimeError("PyArrow version >= 0.13.1 required")
    920 
    921         if engine == "pyarrow-dataset" and pa_version.major >= 1:

RuntimeError: PyArrow version >= 0.13.1 required

我在服务器上使用 Spark 2.4 和 Python 3.7。我的 PyArrow 版本是 0.11.1。升级到其他版本会导致环境不一致。根据 Dask 文档,默认引擎是 auto,它选择 fastparquet(我已安装)。当我运行 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto') 时,我得到相同的 OSError: [Errno 22] Invalid argument


--
OSError                                   Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

Parquet 文件快照

First, I'm writing Spark df named calendar as a parquet file named cal.

calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")

Then, I'm copying it from Hadoop to my personal folder.

!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset

Finally, I'm trying to read the parquet into Dask.

import dask.dataframe as dd

df = dd.read_parquet('/home/vusal.babashov/dataset/cal')

This last step is where I keep getting OSError: [Errno 22] Invalid argument error. The path is correct and the parquet file named cal is there (I can confirm).

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow'), I get the following error


RuntimeError                              Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    290 
    291     if isinstance(engine, str):
--> 292         engine = get_engine(engine)
    293 
    294     if hasattr(path, "name"):

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
    917 
    918         if pa_version < parse_version("0.13.1"):
--> 919             raise RuntimeError("PyArrow version >= 0.13.1 required")
    920 
    921         if engine == "pyarrow-dataset" and pa_version.major >= 1:

RuntimeError: PyArrow version >= 0.13.1 required

I'm using Spark 2.4 and Python 3.7 on the server. The PyArrow version I have is 0.11.1. Upgrading to other versions cause inconsistent environment. According to Dask documentation, default engine is auto which selects fastparquet (I have it installed). When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto'), I get the same OSError: [Errno 22] Invalid argument


--
OSError                                   Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
      1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
      3 df

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
    325         chunksize=chunksize,
    326         aggregate_files=aggregate_files,
--> 327         **kwargs,
    328     )
    329 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
    732         # correspond to a row group (populated below).
    733         parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734             fs, paths, gather_statistics, **kwargs
    735         )
    736 

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
    163         elif gather_statistics is not False:
    164             # Scan every file
--> 165             pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
    166         else:
    167             # Use _common_metadata file if it is available.

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
     91         if isinstance(fn, (tuple, list)):
     92             basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93                                                open_with=open_with, root=root)
     94             if basepath:
     95                 self.fn = join_path(basepath, '_metadata')  # effective file

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
    145 
    146         if verify_schema or fs is None or len(file_list) < 3:
--> 147             pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
    148         else:
    149             # activate new code path here

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
    119                 self.fn = join_path(fn)
    120                 with open_with(fn, 'rb') as f:
--> 121                     self._parse_header(f, verify)
    122             elif "*" in fn or fs.isdir(fn):
    123                 fn2 = join_path(fn, '_metadata')

~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
    161                 if verify:
    162                     assert f.read(4) == b'PAR1'
--> 163                 f.seek(-8, 2)
    164                 head_size = struct.unpack('<i', f.read(4))[0]
    165                 if verify:

OSError: [Errno 22] Invalid argument

Parquet Files Snapshot

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

泪是无色的血 2025-01-25 16:25:50

无效参数意味着代码正在尝试 seek() 到文件中不存在的位置。读取 parquet 文件的元数据页脚时,最后 8 个字节包含页脚的大小,这就是您寻找 (-8, 2) (文件末尾之前的 8 个字节)的原因。

很可能情况是这样的:spark在目录中写入了一些额外的文件,这些文件不是数据文件,例如奇怪的零长度“_SUCCESS”文件和任何校验和;但没有编写_metadata文件来告诉dask哪些文件被视为数据。

我很可能在最新的 dask 和 fastparquet (可能还有箭头)中修复了真实文件过滤的问题。如果无法更新,则需要删除导致混乱的文件。

The invalid argument means that the code is trying to seek() to a location in a file that doesn't exist. When reading the metadata footer of a parquet file, the last 8 bytes contain the size of the footer, which is why you seek the (-8, 2) (8 bytes before the end of a file).

Very likely the situation is this: spark has written some extra files into the directory that are not data files, such as the weird zero-length "_SUCCESS" file and any checksums; but has not written a _metadata file to tell dask which files are to be regarded as data.

This issue of filtering for real files I very probably fixed in latest dask and fastparquet (and probably arrow). If you cannot update, then you will need to remove the files that are causing the confusion.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文