使用人造丝处理并行文件处理

发布于 2025-01-20 12:19:07 字数 1821 浏览 3 评论 0原文

我的本地源文件夹中有 7 个 CSV 文件(每个 55 MB),我想将其转换为 JSON 格式并存储到本地文件夹中。我的操作系统是 MacOS(四核 Intel i5)。 基本上,它是一个简单的 Rust 程序,从控制台运行,因为

./target/release/convert <source-folder> <target-folder>

我使用 Rust 线​​程的多线程方法很糟糕,

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;

    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for h in handles {
        let _ = h.join();
    }

    Ok(())
}

我使用 time 运行它来测量 CPU 利用率,

2.93s user 0.55s system 316% cpu 1.098 total

然后我尝试实现相同的任务使用rayon(线程池)板条箱:

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;
    let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;

    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        pool.install(|| {
            let _ = convert(&source_path, &target_path);
        });
    }

    Ok(())
}

我使用time运行它来测量CPU利用率,这使得

2.97s user 0.53s system 98% cpu 3.561 total

我在使用rayon时看不到任何改进。我可能以错误的方式使用人造丝。 有谁知道它有什么问题吗?

更新(4 月 9 日)

经过一段时间与 Rust 检查器的斗争

I've got 7 CSV files (55 MB each) in my local source folder which I want to convert into JSON format and store into local folder. My OS is MacOS (Quad-Core Intel i5).
Basically, it is a simple Rust program which is run from a console as

./target/release/convert <source-folder> <target-folder>

My multithreading approach using Rust threads is ass following

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;

    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for h in handles {
        let _ = h.join();
    }

    Ok(())
}

I run it using time to measure CPU utilisation which gives

2.93s user 0.55s system 316% cpu 1.098 total

Then I try to implement the same task using the rayon (threadpool) crate:

fn main() -> Result<()> {
    let source_dir = PathBuf::from(get_first_arg()?);
    let target_dir = PathBuf::from(get_second_arg()?);

    let paths = get_file_paths(&source_dir)?;
    let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;

    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;

        pool.install(|| {
            let _ = convert(&source_path, &target_path);
        });
    }

    Ok(())
}

I run it using time to measure CPU utilisation which gives

2.97s user 0.53s system 98% cpu 3.561 total

I can't see any improvements when I use rayon. I probably use rayon the wrong way.
Does anyone have an idea what is wrong with it?

Update (09 Apr)

After some time of fight with the Rust checker ????, just want to share a solution, maybe it could help others, or anyone else could suggest a better approach/solution

pool.scope(move |s| {
        for source_path in paths {
            let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
            s.spawn(move |_s| {
                convert(&source_path, &target_path).unwrap();
            });
        }
    });

But still does not beat the approach using rust std::thread for 113 files.

46.72s user 8.30s system 367% cpu 14.955 total

Update (10 Apr)

After @maxy comment

// rayon solution
paths.into_par_iter().for_each(|source_path| {
        let target_path = create_target_file_path(&source_path, &target_dir);

        match target_path {
            Ok(target_path) => {
                info!(
                    "Processing {}",
                    target_path.to_str().unwrap_or("Unable to convert")
                );
                let res = convert(&source_path, &target_path);
                if let Err(e) = res {
                    error!("{}", e);
                }
            }
            Err(e) => error!("{}", e),
        }
    });
    // std::thread solution
    let mut handles = vec![];
    for source_path in paths {
        let target_path = create_target_file_path(&source_path, &target_dir)?;
        handles.push(thread::spawn(move || {
            let _ = convert(&source_path, &target_path);
        }));
    }

    for handle in handles {
        let _ = handle.join();
    }

Comparison on 57 files:

std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon:        23.36s user 4.08s system 324% cpu 8.464 total

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

轮廓§ 2025-01-27 12:19:07

rayon install 的文档不是超级清楚,但签名:

pub fn install<OP, R>(&self, op: OP) -> R where
    R: Send,
    OP: FnOnce() -> R + Send, 

表示它返回类型R。与您的闭包返回的类型 R 相同。所以显然 install() 必须等待结果。

仅当闭包产生额外任务时,这才有意义,例如通过在闭包内使用 .par_iter() 。我建议直接使用 rayon 的 并行迭代器 (而不是你的 for 循环)遍历文件列表。您甚至不需要创建自己的线程池,默认池通常就可以了。

如果您坚持手动执行此操作,则必须使用 spawn() 而不是 install。并且您可能必须将循环移至传递给 范围()

The docu for rayon install is not super clear, but the signature:

pub fn install<OP, R>(&self, op: OP) -> R where
    R: Send,
    OP: FnOnce() -> R + Send, 

says it returns type R. The same type R that your closure returns. So obviously install() has to wait for the result.

This only makes sense if the closure spawns additional tasks, for example by using .par_iter() inside the closure. I suggest to use rayon's parallel iterators directly (instead of your for loop) over the list of files. You don't even need to create your own thread pool, the default pool is usually fine.

If you insist on doing it manually, you'll have to use spawn() instead of install. And you'll probably have to move your loop into a lambda passed to scope().

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文