Cassandra Python驱动程序不获取结果的下一页

发布于 2025-01-28 12:25:26 字数 711 浏览 5 评论 0原文

Cassandra版本3.25.0的DataStax驱动程序, Python版本3.9

Session.execute()获取前100个记录。根据文档,驾驶员应该 当我们到达第一页的末尾时,请详细获取接下来的页面。但是,它一次又一次地获取同一页面,因此,前100个记录都是可以访问的。

打印记录的循环无限。

ssl_context.verify_mode = CERT_NONE

cluster = Cluster(contact_points=[db_host], port=db_port,
                            auth_provider = PlainTextAuthProvider(db_user, db_pwd),
                            ssl_context=ssl_context
                            )
session = cluster.connect()

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)
results = session.execute(statement)

for row in results:
    print(f"{row}")

我可以看到其他类似的线程,但也没有得到回答。有人以前遇到过这个问题吗?任何帮助都将受到赞赏。

DataStax driver for Cassandra Version 3.25.0,
Python version 3.9

Session.execute() fetches the first 100 records. As per the documentation, the driver is supposed to
tranparently fetch next pages as we reach the end of first page. However, it fetches the same page again and again and hence the first 100 records is all that is ever accessible.

The for loop that prints records goes infinite.

ssl_context.verify_mode = CERT_NONE

cluster = Cluster(contact_points=[db_host], port=db_port,
                            auth_provider = PlainTextAuthProvider(db_user, db_pwd),
                            ssl_context=ssl_context
                            )
session = cluster.connect()

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)
results = session.execute(statement)

for row in results:
    print(f"{row}")

I could see other similar threads, but they are not answered too. Has anyone encountered this issue before? Any help is appreciated.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

尝蛊 2025-02-04 12:25:26

我对问题的初始陈述有点困惑。您提到结果的初始页面是反复获取的,这些是您程序可用的唯一结果。您还指出,运行程序时负责打印结果的FO循环变成无限循环。这些陈述对我来说似乎是矛盾的。如果您从未获得任何输出,您怎么知道驾驶员已经提取了什么?我假设这就是您的意思“无限” ...如果我错了,请纠正我。

以下代码似乎使用Cassandra 4.0.0使用Cassandra-Driver 3.25.0在Python 3.9.0上运行:

import argparse
import logging
import time

from cassandra.cluster import Cluster, SimpleStatement

def setupLogging():
    log = logging.getLogger()
    log.setLevel('DEBUG')

    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
    log.addHandler(handler)

def setupSchema(session):
    session.execute("""create keyspace if not exists "baz" with replication = {'class':'SimpleStrategy', 'replication_factor':1};""")
    session.execute("""create table if not exists baz.qux (run_ts bigint, idx int, uuid timeuuid, primary key (run_ts,idx))""")
    session.execute("""truncate baz.qux""")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-d','--debug', action='store_true')
    args = parser.parse_args()

    cluster = Cluster()
    session = cluster.connect()

    if args.debug:
        setupLogging()
    setupSchema(session)

    run_ts = int(time.time())
    insert_stmt = session.prepare("""insert into baz.qux (run_ts,idx,uuid) values (?,?,now())""")
    for idx in range(10000):
        session.execute(insert_stmt, [run_ts, idx])

    query = "select * from baz.qux"
    stmt = SimpleStatement(query, fetch_size=100)
    results = session.execute(stmt)

    for row in results:
        print(f"{row}")

    cluster.shutdown()
$ time (python foo.py | wc -l)
10000

real    0m12.452s
user    0m3.786s
sys     0m2.197s

您可以尝试使用启用调试日志记录的示例应用程序运行示例应用程序(有关如何启用此功能,请参见上面的示例代码)。听起来您的Cassandra配置(或您的客户端设置)可能会出现某些内容;额外的日志记录可能会帮助您确定(如果有的话)进入的方式。

I'm a bit confused by the initial statement of the problem. You mentioned that the initial page of results is fetched repeatedly and that these are the only results available to your program. You also indicated that the for loop responsible for printing results turns into an infinite loop when you run the program. These statements seem contradictory to me; how can you know what the driver has fetched if you never get any output? I'm assuming that's what you meant by "goes infinite"... if I'm wrong please correct me.

The following code seems to run as expected against Cassandra 4.0.0 using cassandra-driver 3.25.0 on Python 3.9.0:

import argparse
import logging
import time

from cassandra.cluster import Cluster, SimpleStatement

def setupLogging():
    log = logging.getLogger()
    log.setLevel('DEBUG')

    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
    log.addHandler(handler)

def setupSchema(session):
    session.execute("""create keyspace if not exists "baz" with replication = {'class':'SimpleStrategy', 'replication_factor':1};""")
    session.execute("""create table if not exists baz.qux (run_ts bigint, idx int, uuid timeuuid, primary key (run_ts,idx))""")
    session.execute("""truncate baz.qux""")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('-d','--debug', action='store_true')
    args = parser.parse_args()

    cluster = Cluster()
    session = cluster.connect()

    if args.debug:
        setupLogging()
    setupSchema(session)

    run_ts = int(time.time())
    insert_stmt = session.prepare("""insert into baz.qux (run_ts,idx,uuid) values (?,?,now())""")
    for idx in range(10000):
        session.execute(insert_stmt, [run_ts, idx])

    query = "select * from baz.qux"
    stmt = SimpleStatement(query, fetch_size=100)
    results = session.execute(stmt)

    for row in results:
        print(f"{row}")

    cluster.shutdown()
$ time (python foo.py | wc -l)
10000

real    0m12.452s
user    0m3.786s
sys     0m2.197s

You might try running your sample app with debug logging enabled (see sample code above for how to enable this). It sounds like something might be off in your Cassandra configuration (or perhaps your client setup); the additional logging might help you identify what (if anything) is getting in the way.

命比纸薄 2025-02-04 12:25:26

以下是将驱动程序版本限制在3.20之后,最终对我有用的代码片段:

statement = session.prepare(query)
            
# Execute the query once and retrieve the first page of results
results = self.session.execute(statement, params)
for row in results.current_rows:
    process_row(row)
            
# Fetch more pages until they exhaust
while results.has_more_pages:
    page_state = results.paging_state
    results = session.execute(statement, parameters=params, paging_state=page_state)
    for row in results.current_rows:
        process_row(row)

Below is the code snippet that finally worked for me, after restricting the driver version to 3.20:

statement = session.prepare(query)
            
# Execute the query once and retrieve the first page of results
results = self.session.execute(statement, params)
for row in results.current_rows:
    process_row(row)
            
# Fetch more pages until they exhaust
while results.has_more_pages:
    page_state = results.paging_state
    results = session.execute(statement, parameters=params, paging_state=page_state)
    for row in results.current_rows:
        process_row(row)
被翻牌 2025-02-04 12:25:26

代码中的逻辑仅是调用execute()一次,因此结果的内容将永远是100行的同一列表。

您需要在循环中调用execute()以获取类似结果的下一页:

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)

for row in session.execute(statement):
    process_row(row)

有关更多信息,请参见带有python驱动程序的页面。干杯!

The logic in your code is only calling execute() once so the contents of results will only ever be the same list of 100 rows.

You need to call execute() in your loop to get the next page of results like this:

query = "SELECT * FROM content_usage"
statement = SimpleStatement(query, fetch_size=100)

for row in session.execute(statement):
    process_row(row)

For more info, see Paging with the Python driver. Cheers!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文