也许是用PANDAS DataFrame读取太大的文件或一步一步读取所有文件,是否更有利润?
个CPU和20 GB的磁盘的实例中运行了
- 脚本
我已经在18GB的RAM, 4 制作清洁数据(添加列,转换为特定类型),将块数据转换为特定文件类型(Parquet),然后将数据加载到S3,所有这些都使用PANDAS库完成。我想澄清说,每500000行每行都通过子进程同时运行(我正在使用多处理)。
我在程序中的核心脚本:
s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
chunk = ""
i = 0
err = 0
fs = []
columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)
if sep == "\\t": sep = "\t"
dataset_config = {"colunms": columns_name_list, "separator": sep}
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for ln in s3_object.get()['Body'].iter_lines():
try:
i += 1
chunk += str(ln, 'utf-8') + '\n'
except Exception as e:
err += 1
_logger.warning("Numero de errores de encoding {0} : ".format(err))
if i % 500000 == 0:
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
del chunk
chunk = ""
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
for i, f in enumerate(concurrent.futures.as_completed(fs)):
print(f"Process {i} - result {f}")
- 当读取大熊猫(不使用多处理)中的所有文件时,
df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name, sep=separator, header = None, chunksize=5000,index_col=False)))
最后一个情况在读取非常大的文件时太慢。虽然在第一个用户酶中,在平均3分钟内使用最高5GB的文件。然后,应该考虑仅使用第一种用例,或者可能将第二个用途与其他库(如dask)一起使用???
I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases
- My use case is (read line by line):
Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).
My core script in program:
s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
chunk = ""
i = 0
err = 0
fs = []
columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)
if sep == "\\t": sep = "\t"
dataset_config = {"colunms": columns_name_list, "separator": sep}
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for ln in s3_object.get()['Body'].iter_lines():
try:
i += 1
chunk += str(ln, 'utf-8') + '\n'
except Exception as e:
err += 1
_logger.warning("Numero de errores de encoding {0} : ".format(err))
if i % 500000 == 0:
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
del chunk
chunk = ""
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
for i, f in enumerate(concurrent.futures.as_completed(fs)):
print(f"Process {i} - result {f}")
- Case when read all file in pandas (not use multiprocessing)
df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name, sep=separator, header = None, chunksize=5000,index_col=False)))
The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您是否遇到了这样的第二种情况的相同问题?
它将需要一些设置才能实现,但是您也可以尝试
pyspark.pandas
,给出了pyspark
的好处,而不必学习新的东西:Do you run into the same issues running the second case like this?
It'll take some setup to implement, but you could also try
pyspark.pandas
, giving the benefits ofpyspark
, while not having to learn something new: