也许是用PANDAS DataFrame读取太大的文件或一步一步读取所有文件,是否更有利润?

发布于 2025-01-22 06:45:40 字数 1861 浏览 0 评论 0原文

个CPU和20 GB的磁盘的实例中运行了

  1. 脚本

我已经在18GB的RAM, 4 制作清洁数据(添加列,转换为特定类型),将块数据转换为特定文件类型(Parquet),然后将数据加载到S3,所有这些都使用PANDAS库完成。我想澄清说,每500000行每行都通过子进程同时运行(我正在使用多处理)。

我在程序中的核心脚本:

 s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
    chunk = ""
    i = 0
    err = 0    
    fs = []
    columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)

    if sep == "\\t": sep = "\t"
    dataset_config = {"colunms": columns_name_list, "separator": sep}

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:

        for ln in s3_object.get()['Body'].iter_lines():
            try:
                i += 1
                
                chunk += str(ln, 'utf-8') + '\n'

            except Exception as e:
                err += 1
                _logger.warning("Numero de errores de encoding {0} : ".format(err))

            if i % 500000 == 0:

                _logger.info("Procesando {0} registros: ".format(i))
                fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

                del chunk
            
                chunk = ""

        _logger.info("Procesando {0} registros: ".format(i))

        fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

        for i, f in enumerate(concurrent.futures.as_completed(fs)):
            print(f"Process {i} - result {f}")
  1. 当读取大熊猫(不使用多处理)中的所有文件时,
    df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name,  sep=separator, header = None, chunksize=5000,index_col=False)))

最后一个情况在读取非常大的文件时太慢。虽然在第一个用户酶中,在平均3分钟内使用最高5GB的文件。然后,应该考虑仅使用第一种用例,或者可能将第二个用途与其他库(如dask)一起使用???

I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases

  1. My use case is (read line by line):

Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).

My core script in program:

 s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
    chunk = ""
    i = 0
    err = 0    
    fs = []
    columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)

    if sep == "\\t": sep = "\t"
    dataset_config = {"colunms": columns_name_list, "separator": sep}

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:

        for ln in s3_object.get()['Body'].iter_lines():
            try:
                i += 1
                
                chunk += str(ln, 'utf-8') + '\n'

            except Exception as e:
                err += 1
                _logger.warning("Numero de errores de encoding {0} : ".format(err))

            if i % 500000 == 0:

                _logger.info("Procesando {0} registros: ".format(i))
                fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

                del chunk
            
                chunk = ""

        _logger.info("Procesando {0} registros: ".format(i))

        fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

        for i, f in enumerate(concurrent.futures.as_completed(fs)):
            print(f"Process {i} - result {f}")
  1. Case when read all file in pandas (not use multiprocessing)
    df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name,  sep=separator, header = None, chunksize=5000,index_col=False)))

The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

沒落の蓅哖 2025-01-29 06:45:40

您是否遇到了这样的第二种情况的相同问题?

df = pd.DataFrame()
for chunk in pd.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, chunksize=500000, index_col=False):
    df = pd.concat([df, chunk])

它将需要一些设置才能实现,但是您也可以尝试pyspark.pandas,给出了pyspark的好处,而不必学习新的东西:

import pyspark.pandas as ps

df = ps.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, index_col=False)

Do you run into the same issues running the second case like this?

df = pd.DataFrame()
for chunk in pd.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, chunksize=500000, index_col=False):
    df = pd.concat([df, chunk])

It'll take some setup to implement, but you could also try pyspark.pandas, giving the benefits of pyspark, while not having to learn something new:

import pyspark.pandas as ps

df = ps.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, index_col=False)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文