在线程中写入sqlite db

发布于 2025-01-26 05:05:13 字数 2692 浏览 2 评论 0原文

我是我的并发/平行性新手(仍在学习基础知识)的免责声明,并事先感谢。

我想:

  1. 从API中提取数据将
  2. 其写入文件中,
  3. 将标志设置为sqlite数据库中的标志,所以我不再将其拉到

当前的单线螺纹过程

我的基本工作流程是:

import sqlite3, requests, tqdm, os, gzip
from ratelimit import limits, sleep_and_retry
DB = './my_db.sqlite3'

@sleep_and_retry
@limits(calls=9, period=1)
def download(job, conn):
    pk, url, is_downloaded = job
    res = requests.get(url)
    filename = BASE_DIR+f'/{pk}.txt.gz'
    if res.status_code == 200:
        os.makedirs(os.path.dirname(filename), exist_ok=True) 
        with gzip.open(filename, 'wb') as f:
            f.write(res.content)
        update_db(pk, conn)
    else:
        raise Exception('crawl error')

def update_db(pk, conn):
    with conn:
        c = conn.cursor()
        c.execute('''update my_table set is_downloaded = 1 where id = ?;''', (pk,))
        conn.commit()
    return

conn = sqlite3.connect(DB)
with tqdm.tqdm(total=len(jobs)) as progress:
      for job in jobs: 
        download(job, conn)
        progress.update()
conn.close()

这可能会缓慢运行对API和数据库的IO写道;我希望这可能会通过某种并发速度加快。我对此的理论/智慧以及实施有疑问。

实现问题

我尝试使用期货软件包来实现此问题:

import concurrent.futures

# Using same download and update_db funcs as above
conn = sqlite3.connect(DB)
futures = []
with tqdm.tqdm(total=len(jobs)) as progress:
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as exe:
        for job in jobs:
            f = exe.submit(download, job, conn)
            futures.append(f)
            f.add_done_callback(lambda x: progress.update()) 

        for f in concurrent.futures.as_completed(futures):
            if f.exception():
                raise Exception(f.exception())

当我在没有DB写入的情况下实施此功能时, 明显更快,这是放心。但是,当我在DB写入中添加时,它引发了一个例外:异常:在线程中创建的SQLite对象只能在同一线程中使用。该对象是在线程ID 4333387456中创建的,这是线程ID 6202470400。

在类似的多线程应用中写入DB的正确方法是什么?我读到sqlite 可以支持多读多inthreading,但不足以适应这意味着什么意味着什么:“前提是在两个或多个线程中没有同时使用单个数据库连接。”

理论问题

我仍然对多线程与多处理的差异感到困惑,尤其是在Python的GIL和我的用例中,大概是网络和磁盘 - I/O重。我相信多线程是正确的方法,并寻求确认。我在做方法吗?

我读到 。如果是这种情况,为什么我会从我的sqlite客户端中获得错误,说例外:在线程中创建的sqlite对象只能在同一线程中使用。该对象是在线程ID 4333387456中创建的,这是线程ID 6202470400。

A disclaimer that I'm a concurrency/parallelism novice (still learning the fundamentals) and thanks in advance.

I want to:

  1. pull data from an API
  2. write it to a file
  3. set a flag in a SQLite database so I don't pull it again

Current Single Threaded Process

My basic workflow is:

import sqlite3, requests, tqdm, os, gzip
from ratelimit import limits, sleep_and_retry
DB = './my_db.sqlite3'

@sleep_and_retry
@limits(calls=9, period=1)
def download(job, conn):
    pk, url, is_downloaded = job
    res = requests.get(url)
    filename = BASE_DIR+f'/{pk}.txt.gz'
    if res.status_code == 200:
        os.makedirs(os.path.dirname(filename), exist_ok=True) 
        with gzip.open(filename, 'wb') as f:
            f.write(res.content)
        update_db(pk, conn)
    else:
        raise Exception('crawl error')

def update_db(pk, conn):
    with conn:
        c = conn.cursor()
        c.execute('''update my_table set is_downloaded = 1 where id = ?;''', (pk,))
        conn.commit()
    return

conn = sqlite3.connect(DB)
with tqdm.tqdm(total=len(jobs)) as progress:
      for job in jobs: 
        download(job, conn)
        progress.update()
conn.close()

This runs slowly presumably due to the IO of the API and database writes; I hope this could be speed up through some sort of concurrency. I have questions about the theory/wisdom of this as well as the implementation.

The implementation question

I tried to implement this using the futures package:

import concurrent.futures

# Using same download and update_db funcs as above
conn = sqlite3.connect(DB)
futures = []
with tqdm.tqdm(total=len(jobs)) as progress:
    with concurrent.futures.ThreadPoolExecutor(max_workers=16) as exe:
        for job in jobs:
            f = exe.submit(download, job, conn)
            futures.append(f)
            f.add_done_callback(lambda x: progress.update()) 

        for f in concurrent.futures.as_completed(futures):
            if f.exception():
                raise Exception(f.exception())

When I implemented this without the DB write, it was notably faster, which was reassuring. However, when I added in the DB write, it threw an exception: Exception: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 4336387456 and this is thread id 6202470400.

What's the proper way to write to a DB in a multithreaded app like this? I read that SQLite can support multithreading, but don't feel knowledgable enough to apply what this means: "provided that no single database connection is used simultaneously in two or more threads."

The Theory question

I'm still confused by the differences in multithreading vs multiprocessing, especially in context of Python's GIL and my use case, presumably which is network- and disk- I/O heavy. I believe multithreading is the right approach, and am seeking confirmation. Is what I am doing the right approach?

I read that threads share the same memory space, while processes do not. If this is the case, why do I get an error from my SQLite client saying Exception: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 4336387456 and this is thread id 6202470400.?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文