在线程中写入sqlite db
我是我的并发/平行性新手(仍在学习基础知识)的免责声明,并事先感谢。
我想:
- 从API中提取数据将
- 其写入文件中,
- 将标志设置为sqlite数据库中的标志,所以我不再将其拉到
当前的单线螺纹过程
我的基本工作流程是:
import sqlite3, requests, tqdm, os, gzip
from ratelimit import limits, sleep_and_retry
DB = './my_db.sqlite3'
@sleep_and_retry
@limits(calls=9, period=1)
def download(job, conn):
pk, url, is_downloaded = job
res = requests.get(url)
filename = BASE_DIR+f'/{pk}.txt.gz'
if res.status_code == 200:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with gzip.open(filename, 'wb') as f:
f.write(res.content)
update_db(pk, conn)
else:
raise Exception('crawl error')
def update_db(pk, conn):
with conn:
c = conn.cursor()
c.execute('''update my_table set is_downloaded = 1 where id = ?;''', (pk,))
conn.commit()
return
conn = sqlite3.connect(DB)
with tqdm.tqdm(total=len(jobs)) as progress:
for job in jobs:
download(job, conn)
progress.update()
conn.close()
这可能会缓慢运行对API和数据库的IO写道;我希望这可能会通过某种并发速度加快。我对此的理论/智慧以及实施有疑问。
实现问题
我尝试使用期货
软件包来实现此问题:
import concurrent.futures
# Using same download and update_db funcs as above
conn = sqlite3.connect(DB)
futures = []
with tqdm.tqdm(total=len(jobs)) as progress:
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as exe:
for job in jobs:
f = exe.submit(download, job, conn)
futures.append(f)
f.add_done_callback(lambda x: progress.update())
for f in concurrent.futures.as_completed(futures):
if f.exception():
raise Exception(f.exception())
当我在没有DB写入的情况下实施此功能时, 明显更快,这是放心。但是,当我在DB写入中添加时,它引发了一个例外:异常:在线程中创建的SQLite对象只能在同一线程中使用。该对象是在线程ID 4333387456中创建的,这是线程ID 6202470400。
在类似的多线程应用中写入DB的正确方法是什么?我读到sqlite 可以支持多读多inthreading,但不足以适应这意味着什么意味着什么:“前提是在两个或多个线程中没有同时使用单个数据库连接。”
理论问题
我仍然对多线程与多处理的差异感到困惑,尤其是在Python的GIL和我的用例中,大概是网络和磁盘 - I/O重。我相信多线程是正确的方法,并寻求确认。我在做右方法吗?
A disclaimer that I'm a concurrency/parallelism novice (still learning the fundamentals) and thanks in advance.
I want to:
- pull data from an API
- write it to a file
- set a flag in a SQLite database so I don't pull it again
Current Single Threaded Process
My basic workflow is:
import sqlite3, requests, tqdm, os, gzip
from ratelimit import limits, sleep_and_retry
DB = './my_db.sqlite3'
@sleep_and_retry
@limits(calls=9, period=1)
def download(job, conn):
pk, url, is_downloaded = job
res = requests.get(url)
filename = BASE_DIR+f'/{pk}.txt.gz'
if res.status_code == 200:
os.makedirs(os.path.dirname(filename), exist_ok=True)
with gzip.open(filename, 'wb') as f:
f.write(res.content)
update_db(pk, conn)
else:
raise Exception('crawl error')
def update_db(pk, conn):
with conn:
c = conn.cursor()
c.execute('''update my_table set is_downloaded = 1 where id = ?;''', (pk,))
conn.commit()
return
conn = sqlite3.connect(DB)
with tqdm.tqdm(total=len(jobs)) as progress:
for job in jobs:
download(job, conn)
progress.update()
conn.close()
This runs slowly presumably due to the IO of the API and database writes; I hope this could be speed up through some sort of concurrency. I have questions about the theory/wisdom of this as well as the implementation.
The implementation question
I tried to implement this using the futures
package:
import concurrent.futures
# Using same download and update_db funcs as above
conn = sqlite3.connect(DB)
futures = []
with tqdm.tqdm(total=len(jobs)) as progress:
with concurrent.futures.ThreadPoolExecutor(max_workers=16) as exe:
for job in jobs:
f = exe.submit(download, job, conn)
futures.append(f)
f.add_done_callback(lambda x: progress.update())
for f in concurrent.futures.as_completed(futures):
if f.exception():
raise Exception(f.exception())
When I implemented this without the DB write, it was notably faster, which was reassuring. However, when I added in the DB write, it threw an exception: Exception: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 4336387456 and this is thread id 6202470400.
What's the proper way to write to a DB in a multithreaded app like this? I read that SQLite can support multithreading, but don't feel knowledgable enough to apply what this means: "provided that no single database connection is used simultaneously in two or more threads."
The Theory question
I'm still confused by the differences in multithreading vs multiprocessing, especially in context of Python's GIL and my use case, presumably which is network- and disk- I/O heavy. I believe multithreading is the right approach, and am seeking confirmation. Is what I am doing the right approach?
I read that threads share the same memory space, while processes do not. If this is the case, why do I get an error from my SQLite client saying Exception: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 4336387456 and this is thread id 6202470400.
?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论