如何更快地调用 pymongo db?

发布于 2025-01-09 18:25:50 字数 1582 浏览 3 评论 0原文

我正在使用 motor,但 pymongo 是我最初的选择,切换到 motor 因为它是 python 中 mongodb 的异步版本。

我的目标是在最短的等待时间下同时进行大量调用查询 mongodb。

大约有 1000 个交易品种,对于每个交易品种,我必须不时从 mongodb 查询其最新的烛台数据,以便执行某些计算。我需要查询每个品种最新的 5K 文档。因此该集合包含大约 1000 * 5000 = 5,000,000 个文档。

使用 Motor 和 asyncio,我使用以下方法异步获取文档,但是运行代码需要很长时间,而且我似乎不知道为什么。我在虚拟机上使用8核CPU。

对这个问题有帮助吗?

async def getCandleList(symbol): # each symbol contains about 5K latest candles in the collection

    final_str = "{'symbol': '%s'}"%(symbol)
    resultType = 'candlestick_archive'
    dbName = 'candle_db'
    cursor = eval("db.{}.find({}).sort('timeStamp',-1)".format(dbName, final_str))
    finalList = await cursor.to_list(length=None)
    return finalList


async def taskForEachSymbol(symbol):

    while True:
        candleList = await getCandleList(symbol) 
        await generateSignal(candleList) # a function that generates certain signals in real time



def getAllTasks():
    awaitableTasks = []

    for symbol in symbolList: # symbolList contains around 1k symbols
        awaitableTasks.append(asyncio.create_task(taskForEachSymbol(symbol)))
    return awaitableTasks



async def mainTask():
    awaitableTasks = getAllTasks()
    await asyncio.gather(*awaitableTasks, return_exceptions=False)


async def main()
    mainLoop.run_until_complete(mainTask())
    print('completed! ... ')





if __name__ == '__main__':
    mainLoop=asyncio.new_event_loop()
    asyncio.set_event_loop(mainLoop)

    client = motor.motor_asyncio.AsyncIOMotorClient(io_loop=mainLoop)
    db = client.candles

    main()

I am using motor but pymongo was my initial choice, switched to motor because it is an async version of mongodb in python.

My aim here is to query the mongodb with large number of calls at the same time with minimal waiting time.

There's about 1000 symbols and for each symbol I have to query its latest candlestick data from mongodb from time to time in order to perform certain calculation. I need to query the latest 5K documents for each symbol. So the collection contains roughly 1000 * 5000 = 5,000,000 documents.

With Motor and asyncio, I use the following method to fetch documents asynchronously, but it takes really long time to run the code and I can't seem to know why. I am using 8 core cpu on a virtual machine.

Any help with this problem?

async def getCandleList(symbol): # each symbol contains about 5K latest candles in the collection

    final_str = "{'symbol': '%s'}"%(symbol)
    resultType = 'candlestick_archive'
    dbName = 'candle_db'
    cursor = eval("db.{}.find({}).sort('timeStamp',-1)".format(dbName, final_str))
    finalList = await cursor.to_list(length=None)
    return finalList


async def taskForEachSymbol(symbol):

    while True:
        candleList = await getCandleList(symbol) 
        await generateSignal(candleList) # a function that generates certain signals in real time



def getAllTasks():
    awaitableTasks = []

    for symbol in symbolList: # symbolList contains around 1k symbols
        awaitableTasks.append(asyncio.create_task(taskForEachSymbol(symbol)))
    return awaitableTasks



async def mainTask():
    awaitableTasks = getAllTasks()
    await asyncio.gather(*awaitableTasks, return_exceptions=False)


async def main()
    mainLoop.run_until_complete(mainTask())
    print('completed! ... ')





if __name__ == '__main__':
    mainLoop=asyncio.new_event_loop()
    asyncio.set_event_loop(mainLoop)

    client = motor.motor_asyncio.AsyncIOMotorClient(io_loop=mainLoop)
    db = client.candles

    main()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文