为什么apscheduler sqlalchemyjobstore不能腌制作业,而是默认的内存jobstore可以?
我有一个涉及SQLite数据库操作的工作,并且在后台使用应用程序调度程序为我做。如果使用默认的内存作业存储,它运行正常,但是如果它使用sqlalchemyjobstore
,它会引发以下错误:
File "/usr/lib/python3.8/site-packages/apscheduler/jobstores/sqlalchemy.py", line 95, in add_job
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.decl_api.Base'>: attribute lookup Base on sqlalchemy.orm.decl_api failed
这是i intiralise
jirteriase Schedules
class DataManager:
def __init__(self, periodic_refresh_db = False):
self.data_hdl = DataHandler()
self.db = Database(Base)
self.scheduler = BackgroundScheduler(
jobstores={
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
self.refresh_interval_in_hr = 1
self.scheduler.start()
if periodic_refresh_db:
self.start_periodic_refresh_db()
def refresh_db(self):
self.data_hdl.fetch_data()
self.db.delete_unoccupied_nft()
db_ready_data = self.data_hdl.db_ready_data
db_ready_data = db_ready_data[~db_ready_data["slug"].isin(self.db.remain_slugs)]
db_ready_data.to_sql(
name = NftCollection.__tablename__,
con = self.db.engine,
if_exists="append",
index = False
)
nftCollection
是orm
对象,这是以下内容:
class NftCollection(Base):
__tablename__ = "nft_collection"
__table_args__ = {'sqlite_autoincrement': True}
id = Column(Integer, primary_key=True, nullable=False)
slug = Column(String(30), nullable=False)
name = Column(String, nullable=False)
floor = Column(Float, nullable=False, default=0.0)
vol_1day = Column(Integer, nullable=False, default=0.0)
vol_7day = Column(Integer, nullable=False, default=0.0)
occupier = Column(String(100), nullable=True)
occupy_min = Column(Integer, nullable=True)
occupy_until = Column(DateTime, nullable=True)
lock = Column(Boolean, default=False)
start_periodic_refresh_db
涉及数据库操作,它写为以下
def start_periodic_refresh_db(self):
self.scheduler.add_job(
func=self.refresh_db,
trigger="interval",
hours= self.refresh_interval_in_hr,
id = self.refresh_db.__name__,
replace_existing = True,
max_instances = 1,
misfire_grace_time = None,
name = "refresh database by crawling",
next_run_time = datetime.now()
)
print(f"refresh_db scheduled periodically, next run time: {self.next_refresh_time}")
代码,除非删除JobStores 参数,即使用默认
MOMEMEJOBSTORE
为什么?我该如何使SQL Jobs Store的行为能够按预期进行?谢谢
I have a job that involves sqlite database operation and I use the app scheduler do it for me in the background. It is working fine if it uses the default memory job store but if it uses the SQLAlchemyJobStore
, it throws the following error:
File "/usr/lib/python3.8/site-packages/apscheduler/jobstores/sqlalchemy.py", line 95, in add_job
'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.decl_api.Base'>: attribute lookup Base on sqlalchemy.orm.decl_api failed
here is how I initialise
the scheduler
class DataManager:
def __init__(self, periodic_refresh_db = False):
self.data_hdl = DataHandler()
self.db = Database(Base)
self.scheduler = BackgroundScheduler(
jobstores={
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
self.refresh_interval_in_hr = 1
self.scheduler.start()
if periodic_refresh_db:
self.start_periodic_refresh_db()
def refresh_db(self):
self.data_hdl.fetch_data()
self.db.delete_unoccupied_nft()
db_ready_data = self.data_hdl.db_ready_data
db_ready_data = db_ready_data[~db_ready_data["slug"].isin(self.db.remain_slugs)]
db_ready_data.to_sql(
name = NftCollection.__tablename__,
con = self.db.engine,
if_exists="append",
index = False
)
NftCollection
is an orm
object, which is the following:
class NftCollection(Base):
__tablename__ = "nft_collection"
__table_args__ = {'sqlite_autoincrement': True}
id = Column(Integer, primary_key=True, nullable=False)
slug = Column(String(30), nullable=False)
name = Column(String, nullable=False)
floor = Column(Float, nullable=False, default=0.0)
vol_1day = Column(Integer, nullable=False, default=0.0)
vol_7day = Column(Integer, nullable=False, default=0.0)
occupier = Column(String(100), nullable=True)
occupy_min = Column(Integer, nullable=True)
occupy_until = Column(DateTime, nullable=True)
lock = Column(Boolean, default=False)
start_periodic_refresh_db
involves database operation, it is written as the following
def start_periodic_refresh_db(self):
self.scheduler.add_job(
func=self.refresh_db,
trigger="interval",
hours= self.refresh_interval_in_hr,
id = self.refresh_db.__name__,
replace_existing = True,
max_instances = 1,
misfire_grace_time = None,
name = "refresh database by crawling",
next_run_time = datetime.now()
)
print(f"refresh_db scheduled periodically, next run time: {self.next_refresh_time}")
the above code will not work unless removing the jobstores
argument, i.e. using the default MemoryJobStore
Why is it? and how can I make the sql jobs store behave as expected? Thank you
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
这是因为
memoryJobstore
不会腌制任何东西。引用:This is because
MemoryJobStore
does not pickle anything. Quote from the documentation: