将SPARK DATAFRAME转换为DASK数据框
首先,我将名为 calendar 的 Spark df 编写为名为 cal 的 parquet 文件。
calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")
然后,我将其从 Hadoop 复制到我的个人文件夹。
!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset
最后,我尝试将镶木地板读入Dask。
import dask.dataframe as dd
df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
这最后一步是我不断收到 OSError: [Errno 22] Invalid argument 错误的地方。路径是正确的,并且名为 cal 的镶木地板文件存在(我可以确认)。
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
当我运行 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow') 时,出现以下错误
RuntimeError Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
290
291 if isinstance(engine, str):
--> 292 engine = get_engine(engine)
293
294 if hasattr(path, "name"):
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
917
918 if pa_version < parse_version("0.13.1"):
--> 919 raise RuntimeError("PyArrow version >= 0.13.1 required")
920
921 if engine == "pyarrow-dataset" and pa_version.major >= 1:
RuntimeError: PyArrow version >= 0.13.1 required
我在服务器上使用 Spark 2.4 和 Python 3.7。我的 PyArrow 版本是 0.11.1。升级到其他版本会导致环境不一致。根据 Dask 文档,默认引擎是 auto,它选择 fastparquet(我已安装)。当我运行 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto') 时,我得到相同的 OSError: [Errno 22] Invalid argument
--
OSError Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
First, I'm writing Spark df named calendar as a parquet file named cal.
calendar.write.parquet("/user/vusal.babashov/dataset/cal", mode="overwrite")
Then, I'm copying it from Hadoop to my personal folder.
!hdfs dfs -copyToLocal -f /user/vusal.babashov/dataset/cal /home/vusal.babashov/dataset
Finally, I'm trying to read the parquet into Dask.
import dask.dataframe as dd
df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
This last step is where I keep getting OSError: [Errno 22] Invalid argument error. The path is correct and the parquet file named cal is there (I can confirm).
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
<ipython-input-12-372cb3d97d10> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow'), I get the following error
RuntimeError Traceback (most recent call last)
<ipython-input-1-9b03dc4d018b> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal.parquet', engine='pyarrow')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
290
291 if isinstance(engine, str):
--> 292 engine = get_engine(engine)
293
294 if hasattr(path, "name"):
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in get_engine(engine)
917
918 if pa_version < parse_version("0.13.1"):
--> 919 raise RuntimeError("PyArrow version >= 0.13.1 required")
920
921 if engine == "pyarrow-dataset" and pa_version.major >= 1:
RuntimeError: PyArrow version >= 0.13.1 required
I'm using Spark 2.4 and Python 3.7 on the server. The PyArrow version I have is 0.11.1. Upgrading to other versions cause inconsistent environment. According to Dask documentation, default engine is auto which selects fastparquet (I have it installed). When I run df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto'), I get the same OSError: [Errno 22] Invalid argument
--
OSError Traceback (most recent call last)
<ipython-input-8-361b3123f3d5> in <module>
1 import dask.dataframe as dd
----> 2 df = dd.read_parquet('/home/vusal.babashov/dataset/cal', engine='auto')
3 df
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
325 chunksize=chunksize,
326 aggregate_files=aggregate_files,
--> 327 **kwargs,
328 )
329
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, chunksize, aggregate_files, **kwargs)
732 # correspond to a row group (populated below).
733 parts, pf, gather_statistics, base_path = _determine_pf_parts(
--> 734 fs, paths, gather_statistics, **kwargs
735 )
736
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/dask/dataframe/io/parquet/fastparquet.py in _determine_pf_parts(fs, paths, gather_statistics, **kwargs)
163 elif gather_statistics is not False:
164 # Scan every file
--> 165 pf = ParquetFile(paths, open_with=fs.open, **kwargs.get("file", {}))
166 else:
167 # Use _common_metadata file if it is available.
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
91 if isinstance(fn, (tuple, list)):
92 basepath, fmd = metadata_from_many(fn, verify_schema=verify,
---> 93 open_with=open_with, root=root)
94 if basepath:
95 self.fn = join_path(basepath, '_metadata') # effective file
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in metadata_from_many(file_list, verify_schema, open_with, root, fs)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/util.py in <listcomp>(.0)
145
146 if verify_schema or fs is None or len(file_list) < 3:
--> 147 pfs = [api.ParquetFile(fn, open_with=open_with) for fn in file_list]
148 else:
149 # activate new code path here
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in __init__(self, fn, verify, open_with, root, sep, fs)
119 self.fn = join_path(fn)
120 with open_with(fn, 'rb') as f:
--> 121 self._parse_header(f, verify)
122 elif "*" in fn or fs.isdir(fn):
123 fn2 = join_path(fn, '_metadata')
~/anaconda3/envs/notebookEnv/lib/python3.7/site-packages/fastparquet/api.py in _parse_header(self, f, verify)
161 if verify:
162 assert f.read(4) == b'PAR1'
--> 163 f.seek(-8, 2)
164 head_size = struct.unpack('<i', f.read(4))[0]
165 if verify:
OSError: [Errno 22] Invalid argument
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
无效参数意味着代码正在尝试
seek()
到文件中不存在的位置。读取 parquet 文件的元数据页脚时,最后 8 个字节包含页脚的大小,这就是您寻找(-8, 2)
(文件末尾之前的 8 个字节)的原因。很可能情况是这样的:spark在目录中写入了一些额外的文件,这些文件不是数据文件,例如奇怪的零长度“_SUCCESS”文件和任何校验和;但没有编写
_metadata
文件来告诉dask哪些文件被视为数据。我很可能在最新的 dask 和 fastparquet (可能还有箭头)中修复了真实文件过滤的问题。如果无法更新,则需要删除导致混乱的文件。
The invalid argument means that the code is trying to
seek()
to a location in a file that doesn't exist. When reading the metadata footer of a parquet file, the last 8 bytes contain the size of the footer, which is why you seek the(-8, 2)
(8 bytes before the end of a file).Very likely the situation is this: spark has written some extra files into the directory that are not data files, such as the weird zero-length "_SUCCESS" file and any checksums; but has not written a
_metadata
file to tell dask which files are to be regarded as data.This issue of filtering for real files I very probably fixed in latest dask and fastparquet (and probably arrow). If you cannot update, then you will need to remove the files that are causing the confusion.