SQLAlchemy - WHERE 子句中的子查询

发布于 2024-11-11 17:58:49 字数 840 浏览 2 评论 0原文

我最近刚刚开始使用 SQLAlchemy,但仍然无法理解某些概念。

归结为基本要素,我有两个这样的表(这是通过 Flask-SQLAlchemy 实现的):

class User(db.Model):
    __tablename__ = 'users'
    user_id = db.Column(db.Integer, primary_key=True)

class Posts(db.Model):
    __tablename__ = 'posts'
    post_id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.user_id'))
    post_time = db.Column(db.DateTime)

    user = db.relationship('User', backref='posts')

我将如何查询用户列表及其最新帖子(不包括没有帖子的用户)。如果我使用 SQL,我会这样做:

SELECT [whatever]
FROM posts AS p
    LEFT JOIN users AS u ON u.user_id = p.user_id
WHERE p.post_time = (SELECT MAX(post_time) FROM posts WHERE user_id = u.user_id)

所以我确切地知道“期望的”SQL 以获得我想要的效果,但不知道如何在 SQLAlchemy 中“正确”表达它。

编辑:如果它很重要,我使用的是 SQLAlchemy 0.6.6。

I've just recently started using SQLAlchemy and am still having trouble wrapping my head around some of the concepts.

Boiled down to the essential elements, I have two tables like this (this is through Flask-SQLAlchemy):

class User(db.Model):
    __tablename__ = 'users'
    user_id = db.Column(db.Integer, primary_key=True)

class Posts(db.Model):
    __tablename__ = 'posts'
    post_id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.user_id'))
    post_time = db.Column(db.DateTime)

    user = db.relationship('User', backref='posts')

How would I go about querying for a list of users and their newest post (excluding users with no posts). If I was using SQL, I would do:

SELECT [whatever]
FROM posts AS p
    LEFT JOIN users AS u ON u.user_id = p.user_id
WHERE p.post_time = (SELECT MAX(post_time) FROM posts WHERE user_id = u.user_id)

So I know exactly the "desired" SQL to get the effect I want, but no idea how to express it "properly" in SQLAlchemy.

Edit: in case it's important, I'm on SQLAlchemy 0.6.6.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

九歌凝 2024-11-18 17:58:49

这应该可以工作(不同的 SQL,相同的结果):

t = Session.query(
    Posts.user_id,
    func.max(Posts.post_time).label('max_post_time'),
).group_by(Posts.user_id).subquery('t')

query = Session.query(User, Posts).filter(and_(
    User.user_id == Posts.user_id,
    User.user_id == t.c.user_id,
    Posts.post_time == t.c.max_post_time,
))

for user, post in query:
    print user.user_id, post.post_id

其中 c 代表“列”

This should work (different SQL, same result):

t = Session.query(
    Posts.user_id,
    func.max(Posts.post_time).label('max_post_time'),
).group_by(Posts.user_id).subquery('t')

query = Session.query(User, Posts).filter(and_(
    User.user_id == Posts.user_id,
    User.user_id == t.c.user_id,
    Posts.post_time == t.c.max_post_time,
))

for user, post in query:
    print user.user_id, post.post_id

Where c stands for 'columns'

红ご颜醉 2024-11-18 17:58:49

前面的答案有效,但您要求的确切sql也与实际语句一样编写:

print s.query(User, Posts).\
    outerjoin(Posts.user).\
    filter(Posts.post_time==\
        s.query(
            func.max(Posts.post_time)
        ).
        filter(Posts.user_id==User.user_id).
        correlate(User).
        as_scalar()
    )

我猜不一定明显的“概念”是当前需要 as_scalar() 来建立一个子查询作为“标量” (它可能应该从上下文中假设 ==)。

编辑:已确认,这是错误行为,已完成票#2190。在当前的提示或版本 0.7.2 中,自动调用 as_scalar() ,上面的查询可以是:

print s.query(User, Posts).\
    outerjoin(Posts.user).\
    filter(Posts.post_time==\
        s.query(
            func.max(Posts.post_time)
        ).
        filter(Posts.user_id==User.user_id).
        correlate(User)
    )

the previous answer works, but also the exact sql you asked for is written much as the actual statement:

print s.query(User, Posts).\
    outerjoin(Posts.user).\
    filter(Posts.post_time==\
        s.query(
            func.max(Posts.post_time)
        ).
        filter(Posts.user_id==User.user_id).
        correlate(User).
        as_scalar()
    )

I guess the "concept" that isn't necessarily apparent is that as_scalar() is currently needed to establish a subquery as a "scalar" (it should probably assume that from the context against ==).

Edit: Confirmed, that's buggy behavior, completed ticket #2190. In the current tip or release 0.7.2, the as_scalar() is called automatically and the above query can be:

print s.query(User, Posts).\
    outerjoin(Posts.user).\
    filter(Posts.post_time==\
        s.query(
            func.max(Posts.post_time)
        ).
        filter(Posts.user_id==User.user_id).
        correlate(User)
    )
丘比特射中我 2024-11-18 17:58:49

它的表达方式通常与实际的 SQL 类似 - 您创建一个返回单个结果的子查询并与其进行比较 - 但是有时真正的痛苦是如果您必须使用您已在查询或联接的子查询中的表。

解决方案是创建模型的别名版本以在子查询中引用。

因此,假设您已经在一个连接中进行操作,其中您有一个现有的 Posts model 和一些基本的 query 准备就绪 - 现在,您需要要查询每个用户的最新(单个)帖子列表,您可以像这样过滤查询:

from sqlalchemy.orm import aliased
posts2 = aliased(Posts) # create aliased version

query = query.filter(
    model.post_id
    ==
    Posts.query # create query directly from model, NOT from the aliased version!
        .with_entities(posts2.post_id) # only select column "post_id"
        .filter(
            posts2.user_id == model.user_id
        )
        .order_by(posts2.post_id.desc()) # assume higher id == newer post
        .limit(1) # we must limit to a single row so we only get 1 value
)

我故意不使用 func.max 因为我认为这是一个更简单的版本,它是已经在其他答案中,我认为这个示例对于通常发现这个问题的人很有用,因为他们正在寻找如何子查询同一个表的解决方案。

It is usually expressed similarly to the actual SQL - you create a subquery that returns single result and compare against that - however what sometimes can be real pain is if you have to use a table in the subquery that you are already querying or joining on.

Solution is to create an aliased version of the model to reference in the subquery.

So let's say you are already operating in a connection where you have an existing Posts model and some basic query ready - now, you'd want to query for the list of latest (single) post from each user, you'd filter the query like:

from sqlalchemy.orm import aliased
posts2 = aliased(Posts) # create aliased version

query = query.filter(
    model.post_id
    ==
    Posts.query # create query directly from model, NOT from the aliased version!
        .with_entities(posts2.post_id) # only select column "post_id"
        .filter(
            posts2.user_id == model.user_id
        )
        .order_by(posts2.post_id.desc()) # assume higher id == newer post
        .limit(1) # we must limit to a single row so we only get 1 value
)

I've purposedly did not use the func.max because I consider that a simpler version and it's already in other answers, this example I think will be useful to people that generally find this question because they are looking for a solution how to subquery the same table.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文