将SQL查询读取到DASK数据框

发布于 2025-02-01 08:15:15 字数 2566 浏览 4 评论 0 原文

我正在尝试创建一个将SQL选择查询作为参数的函数,并使用dask使用 dask.read_sql_query function将其结果读取到dask dataframe中。我是Dask和Sqlalchemy的新手。 我首先尝试了这一点:

import dask.dataFrame as dd

query = "SELECT name, age, date_of_birth from customer"
df = dd.read_sql_query(sql=query, con=con_string, index_col="name", npartitions=10)

您可能已经知道,这是不起作用的,因为 sql 参数必须是可选的sqlalchemy,更重要的是, textclause 不支持。

然后,我将 Select 之后的查询包裹起来:

import dask.dataFrame as dd
from sqlalchemy import sql

query = "SELECT name, age, date_of_birth from customer"
sa_query = sql.select(sql.text(query))
df = dd.read_sql_query(sql=sa_query, con=con_string, index_col="name")

这也失败了,我一直试图解决的错误错误。问题在于,Dask需要推断列的类型,并且它通过读取第一个 head_row 行中的第5行-5行默认情况下来,并推断出那里的类型。 this line> line 在dask codebase中在查询中添加一个限制?,最终是

SELECT name, age, date_of_birth from customer LIMIT param_1

param_1 根本无法用正确的值代替-5在这种情况下。然后在下一行中失败,,TJAT评估SQL表达式。

sqlalchemy.exc.ProgrammingError: (mariadb.ProgrammingError) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'SELECT name, age, date_of_birth from customer 
 LIMIT ?' at line 1
[SQL: SELECT SELECT name, age, date_of_birth from customer 
 LIMIT ?]
[parameters: (5,)]
(Background on this error at: https://sqlalche.me/e/14/f405)

我不明白为什么 param_1 没有用head_rows的值代替。从错误消息中可以看到,它检测到有一个参数需要用于替换,但由于某种原因,它实际上无法替代它。

也许,我无法正确创建可选的sqlalchemy?

我可以简单地使用 pandas.read_sql 并从生成的pandas dataframe创建一个dask数据框,但这却失败了首先使用dask的目的。

我有以下限制:

  • 我无法更改功能以接受现成的sqlalchemy 可选。此功能将添加到使用的私人库中 我的公司和使用此库的各种项目不使用 Sqlalchemy。
  • meta 传递给自定义功能不是一个选项,因为它需要呼叫者确实创建它。但是,将 meta 将属性属于 read_sql_query 和设置 head_rows = 0 只要有一个有效的方法来检索/创建
  • 时< a href =“ https://dask-sql.readthedocs.io/en/latest/” rel =“ nofollow noreferrer”> dask-sql 可能对这种情况有效,使用它不是一个 选项,不幸的是,

如何正确地将SQL查询读取到DASK数据框架中?

I'm trying create a function that takes an SQL SELECT query as a parameter and use dask to read its results into a dask DataFrame using the dask.read_sql_query function. I am new to dask and to SQLAlchemy.
I first tried this:

import dask.dataFrame as dd

query = "SELECT name, age, date_of_birth from customer"
df = dd.read_sql_query(sql=query, con=con_string, index_col="name", npartitions=10)

As you probably already know, this won't work because the sql parameter has to be an SQLAlchemy selectable and more importantly, TextClause isn't supported.

I then wrapped the query behind a select like this:

import dask.dataFrame as dd
from sqlalchemy import sql

query = "SELECT name, age, date_of_birth from customer"
sa_query = sql.select(sql.text(query))
df = dd.read_sql_query(sql=sa_query, con=con_string, index_col="name")

This fails too with a very weird error that I have been trying to solve. The problem is that dask needs to infer the types of the columns and it does so by reading the first head_row rows in the table - 5 rows by default - and infer the types there. This line in the dask codebase adds a LIMIT ? to the query, which ends up being

SELECT name, age, date_of_birth from customer LIMIT param_1

The param_1 doesn't get substituted at all with the right value - 5 in this case. It then fails on the next line, https://github.com/dask/dask/blob/main/dask/dataframe/io/sql.py#L119, tjat evaluates the SQL expression.

sqlalchemy.exc.ProgrammingError: (mariadb.ProgrammingError) You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'SELECT name, age, date_of_birth from customer 
 LIMIT ?' at line 1
[SQL: SELECT SELECT name, age, date_of_birth from customer 
 LIMIT ?]
[parameters: (5,)]
(Background on this error at: https://sqlalche.me/e/14/f405)

I can't understand why param_1 wasn't substituted with the value of head_rows. One can see from the error message that it detects there's a parameter that needs to be used for the substitution but for some reason it doesn't actually substitute it.

Perhaps, I didn't correctly create the SQLAlchemy selectable?

I can simply use pandas.read_sql and create a dask dataframe from the resulting pandas dataframe but that defeats the purpose of using dask in the first place.

I have the following constraints:

  • I cannot change the function to accept a ready-made sqlalchemy
    selectable. This feature will be added to a private library used at
    my company and various projects using this library do not use
    sqlalchemy.
  • Passing meta to the custom function is not an option because it would require the caller do create it. However, passing a meta attribute to read_sql_query and setting head_rows=0 is completely ok as long as there's an efficient way to retrieve/create
  • while dask-sql might work for this case, using it is not an
    option, unfortunately

How can I go about correctly reading an SQL query into dask dataframe?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

假装爱人 2025-02-08 08:15:16

问题的症结在于这一行:

sa_query = sql.select(sql.text(query))

发生的事情是我们正在构造一个嵌套的查询,
这可能会导致下游问题。

让我们首先创建一个测试数据库:

# create a test database (using https://stackoverflow.com/a/64898284/10693596)
from sqlite3 import connect

from dask.datasets import timeseries

con = "delete_me_test.sqlite"
db = connect(con)

# create a pandas df and store (timestamp is dropped to make sure
# that the index is numeric)
df = (
    timeseries(start="2000-01-01", end="2000-01-02", freq="1h", seed=0)
    .compute()
    .reset_index()
)
df.to_sql("ticks", db, if_exists="replace")

接下来,让我们尝试使事情与 pandas 没有 sqlalchemy

from pandas import read_sql_query

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
meta = read_sql_query(sql=query, con=con).set_index("index")

print(meta)
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963

现在,让我们添加 sqlalchemy 函数:

from pandas import read_sql_query
from sqlalchemy.sql import text, select

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
sa_query = select(text(query))
meta = read_sql_query(sql=sa_query, con=con).set_index("index")
# OperationalError: (sqlite3.OperationalError) near "SELECT": syntax error
# [SQL: SELECT SELECT * FROM ticks LIMIT 3]
# (Background on this error at: https://sqlalche.me/e/14/e3q8)

注意由于运行 sqlalchemy.select 在现有查询上选择选择。这可能会引起问题。如何解决这个问题?通常,我认为将任意SQL查询转换为其 sqlalchemy 等效的方式没有一种安全,可靠的方法语句,您可以在将查询传递给 sqlalchemy.select

from dask.dataframe import read_sql_query
from sqlalchemy.sql import select, text

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks"


def _remove_leading_select_from_query(query):
    if query.startswith("SELECT "):
        return query.replace("SELECT ", "", 1)
    else:
        return query


sa_query = select(text(_remove_leading_select_from_query(query)))
ddf = read_sql_query(sql=sa_query, con=con, index_col="index")

print(ddf)
print(ddf.head(3))
# Dask DataFrame Structure:
#                   id    name        x        y
# npartitions=1
# 0              int64  object  float64  float64
# 23               ...     ...      ...      ...
# Dask Name: from-delayed, 2 tasks
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963

The crux of the problem is this line:

sa_query = sql.select(sql.text(query))

What is happening is that we are constructing a nested SELECT query,
which can cause a problem downstream.

Let's first create a test database:

# create a test database (using https://stackoverflow.com/a/64898284/10693596)
from sqlite3 import connect

from dask.datasets import timeseries

con = "delete_me_test.sqlite"
db = connect(con)

# create a pandas df and store (timestamp is dropped to make sure
# that the index is numeric)
df = (
    timeseries(start="2000-01-01", end="2000-01-02", freq="1h", seed=0)
    .compute()
    .reset_index()
)
df.to_sql("ticks", db, if_exists="replace")

Next, let's try to get things working with pandas without sqlalchemy:

from pandas import read_sql_query

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
meta = read_sql_query(sql=query, con=con).set_index("index")

print(meta)
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963

Now, let's add sqlalchemy functions:

from pandas import read_sql_query
from sqlalchemy.sql import text, select

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks LIMIT 3"
sa_query = select(text(query))
meta = read_sql_query(sql=sa_query, con=con).set_index("index")
# OperationalError: (sqlite3.OperationalError) near "SELECT": syntax error
# [SQL: SELECT SELECT * FROM ticks LIMIT 3]
# (Background on this error at: https://sqlalche.me/e/14/e3q8)

Note the SELECT SELECT due to running sqlalchemy.select on an existing query. This can cause problems. How to fix this? In general, I don't think there's a safe and robust way of transforming arbitrary SQL queries into their sqlalchemy equivalent, but if this is for an application where you know that users will only run SELECT statements, you can manually sanitize the query before passing it to sqlalchemy.select:

from dask.dataframe import read_sql_query
from sqlalchemy.sql import select, text

con = "sqlite:///test.sql"
query = "SELECT * FROM ticks"


def _remove_leading_select_from_query(query):
    if query.startswith("SELECT "):
        return query.replace("SELECT ", "", 1)
    else:
        return query


sa_query = select(text(_remove_leading_select_from_query(query)))
ddf = read_sql_query(sql=sa_query, con=con, index_col="index")

print(ddf)
print(ddf.head(3))
# Dask DataFrame Structure:
#                   id    name        x        y
# npartitions=1
# 0              int64  object  float64  float64
# 23               ...     ...      ...      ...
# Dask Name: from-delayed, 2 tasks
#          id    name         x         y
# index
# 0       998  Ingrid  0.760997 -0.381459
# 1      1056  Ingrid  0.506099  0.816477
# 2      1056   Laura  0.316556  0.046963
好倦 2025-02-08 08:15:16

我在尝试解决方案时遇到了相同的限制,但是在进行文档后,我意识到问题可能源于SQL Alchemy Select Select子句的错误构造。这是为我解决问题的修订代码:

import dask.dataframe as dd
from sqlalchemy import sql

        
        conn_str = "you sqlalchemy connection string here"
        id = sql.column("numeric_pk_id_column")
        col1 = sql.column("some_other_column")
        table = sql.table("table", schema = "schema")


        select_statement = sql.select([id,
                                       col1]
                                      ).select_from(table)

        df = dd.read_sql_query(select_statement, conn_str, index_col=id)

https://www.oreilly.com/library/view/scaling-python-with/9781098119867/ch04.html
示例4-4

I encountered the same limitation while attempting the solution, but after delving into the documentation, I realized that the issue might stem from an incorrect construction of the SQL Alchemy select clause. Here's the revised code that resolved the problem for me:

import dask.dataframe as dd
from sqlalchemy import sql

        
        conn_str = "you sqlalchemy connection string here"
        id = sql.column("numeric_pk_id_column")
        col1 = sql.column("some_other_column")
        table = sql.table("table", schema = "schema")


        select_statement = sql.select([id,
                                       col1]
                                      ).select_from(table)

        df = dd.read_sql_query(select_statement, conn_str, index_col=id)

ref: https://www.oreilly.com/library/view/scaling-python-with/9781098119867/ch04.html
Example 4-4

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文