在Rust WASM中的功能返回时创建线程

发布于 2025-02-01 22:58:28 字数 3584 浏览 4 评论 0原文

我正在WASM环境中使用Polars。

我已经注意到与leazyframe的不一致性。收集操作有时在使用某些数据集时会创建线程。

以下是与特定数据集使用的问题有关的代码,

#[wasm_bindgen]
pub fn start(buff: &[u8],
    item_id:&str, 
    order_id:&str,
    item_name:&str) -> JsValue{

    let cursor = Cursor::new(buff);
    let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();


    let df = lf.groupby([col(order_id)]);
    let df = df.agg([col(item_id),col(item_name)]);
    
    // Error occurs here
    let df = df.collect().unwrap();

} 

为我提供了错误:

panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'

因为它试图在WASM环境中产生线程。

但是,对于其他数据集,此过程将完美执行。而且它不会尝试创建线程。由于使用各种数据集测试,该问题似乎不是文件大小。

我想知道leazyframe的哪一部分。集合操作会造成这种不一致以及如何避免它。

working.csv

Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5

notworking.csv

Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006

允许WASM的Polars Fork由 https://github.com/universalmind303/polars/polars/tree/tree/wasm

您可以看到这里的完整项目以及两个CSV文件: https://github.com/kivalm/kivalm/lazyframe-min-min-test

docution_plan()

工作数据集的工作

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

数据集

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

的数据集输出()

工作数据集

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

不工作数据

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

集输出docution_optimized_plan():

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
    project 3/3 columns |   details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
    selection: "None"

edit:edit:edit: 仔细观察源代码后。这个问题似乎直接来自任何Porars代码。 我已经将问题跟踪到PORARS lazy/src/hythal_plan/executors/groupby.rs函数,

impl Executor for GroupByExec {
    fn execute

返回一个值

groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)

然后从groupby_helper函数 ,运行到完成,并且数据框是成功创建的。当数据框从groupby_helper返回fn execute时出现错误。奇怪的是,仅在此函数返回时才尝试创建线程。 Rust Wasm中是否存在可能导致这样的行为?

I am working with Polars in a wasm environment.

I have noticed an inconsistency with the LazyFrame.collect operation where it sometimes creates threads when working with certain datasets.

Here is the code that relates to the issue

#[wasm_bindgen]
pub fn start(buff: &[u8],
    item_id:&str, 
    order_id:&str,
    item_name:&str) -> JsValue{

    let cursor = Cursor::new(buff);
    let lf = CsvReader::new(cursor).with_ignore_parser_errors(true).finish().unwrap().lazy();


    let df = lf.groupby([col(order_id)]);
    let df = df.agg([col(item_id),col(item_name)]);
    
    // Error occurs here
    let df = df.collect().unwrap();

} 

Working with a particular dataset provides me with the error:

panicked at 'failed to spawn thread: Error { kind: Unsupported, message: "operation not supported on this platform" }'

because it is attempting to spawn threads in a WASM environment.

However, with other datasets, this process would execute flawlessly. And it would not try to create the threads. The issue does not seem to be file size due to testing with various datasets.

I would like to know what part of the Lazyframe.collect operation creates this inconsistency and how to avoid it.

working.csv

Order ID,Product ID,Product Name
InvoiceNo0,Product ID0,Product Name0
InvoiceNo0,Product ID1,Product Name1
InvoiceNo0,Product ID2,Product Name2
InvoiceNo0,Product ID3,Product Name3
InvoiceNo0,Product ID4,Product Name4
InvoiceNo0,Product ID5,Product Name5

notworking.csv

Order ID,Product ID,Product Name
B0000001,P0001,Product - 0001
B0000001,P0002,Product - 0002
B0000001,P0003,Product - 0003
B0000001,P0004,Product - 0004
B0000001,P0005,Product - 0005
B0000002,P0006,Product - 0006

The Polars fork that allows wasm is provided by
https://github.com/universalmind303/polars/tree/wasm

You can see the full project here, as well as both CSV files:
https://github.com/KivalM/lazyframe-min-test

EDIT: Output of describe_plan()

working dataset

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

not working dataset

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Order ID", "Product ID", "Product Name"];
    project */3 columns |   details: None;
    selection: "None"

Output of schema()

working dataset

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

not working dataset

name: Order ID, data type: Utf8
name: Product ID, data type: Utf8
name: Product Name, data type: Utf8

output describe_optimized_plan():

    [col("Product ID"), col("Product Name")] BY [col("Order ID")] FROM DATAFRAME(in-memory): ["Product ID", "Product Name", "Order ID"];
    project 3/3 columns |   details: Some([col("Product ID"), col("Product Name"), col("Order ID")]);
    selection: "None"

EDIT:
After a closer look at the source code. the problem doesnt seem to be directly from any polars code.
I have tracked the issue down to polars-lazy/src/physical_plan/executors/groupby.rs Function

impl Executor for GroupByExec {
    fn execute

Which then returns a value from

groupby_helper(df,keys,&self.aggs,self.apply.as_ref(),state,self.maintain_order,self.slice,)

However, the groupby_helper function runs to completion, and the dataframe is successfully created. The error appears when the dataframe is being returned from groupby_helper to fn execute. It is odd that a thread is attempting to be created only when this function returns. Does there exist something in RUST WASM that could cause behaviour like this?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

娇纵 2025-02-08 22:58:28

因此,看来我在创建分支时错过的groupBys发生了一个std :: thread操作。

impl Drop for GroupsIdx {
    fn drop(&mut self) {
        let v = std::mem::take(&mut self.all);
        // ~65k took approximately 1ms on local machine, so from that point we drop on other thread
        // to stop query from being blocked
        if v.len() > 1 << 16 {
            std::thread::spawn(move || drop(v));
        } else {
            drop(v);
        }
    }
}

数据集大小是确定线程生成的原因。

任何大于1&lt;&lt; 16(〜65k)将产生线程。

功能标记impl仅在非WASM目标上编译应解决您的问题。

so it looks like there is a std::thread operation happening with the groupbys that I missed when creating the branch.

impl Drop for GroupsIdx {
    fn drop(&mut self) {
        let v = std::mem::take(&mut self.all);
        // ~65k took approximately 1ms on local machine, so from that point we drop on other thread
        // to stop query from being blocked
        if v.len() > 1 << 16 {
            std::thread::spawn(move || drop(v));
        } else {
            drop(v);
        }
    }
}

The dataset size is what is determining the thread spawn.

any group greater than 1 << 16 (~65k) will spawn a thread.

Feature flagging that impl to only compile on non-wasm targets should fix your issue.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文