为什么apscheduler sqlalchemyjobstore不能腌制作业,而是默认的内存jobstore可以?

发布于 2025-01-23 12:45:59 字数 2999 浏览 0 评论 0原文

我有一个涉及SQLite数据库操作的工作,并且在后台使用应用程序调度程序为我做。如果使用默认的内存作业存储,它运行正常,但是如果它使用sqlalchemyjobstore,它会引发以下错误:

  File "/usr/lib/python3.8/site-packages/apscheduler/jobstores/sqlalchemy.py", line 95, in add_job
    'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.decl_api.Base'>: attribute lookup Base on sqlalchemy.orm.decl_api failed

这是i intiralise jirteriase Schedules

class DataManager:
    
    def __init__(self, periodic_refresh_db = False):
        self.data_hdl = DataHandler()
        self.db = Database(Base)
        self.scheduler = BackgroundScheduler(
            jobstores={
                'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
        self.refresh_interval_in_hr = 1
        self.scheduler.start()
        if periodic_refresh_db:
            self.start_periodic_refresh_db()
    
    def refresh_db(self):
        self.data_hdl.fetch_data()
        self.db.delete_unoccupied_nft()
        
        db_ready_data = self.data_hdl.db_ready_data
        db_ready_data = db_ready_data[~db_ready_data["slug"].isin(self.db.remain_slugs)]
        
        db_ready_data.to_sql(
            name = NftCollection.__tablename__,
            con = self.db.engine,
            if_exists="append",
            index = False
        )

nftCollectionorm对象,这是以下内容:

class NftCollection(Base):

    __tablename__ = "nft_collection"
    __table_args__ = {'sqlite_autoincrement': True}

    id = Column(Integer, primary_key=True, nullable=False)
    slug = Column(String(30), nullable=False)
    name = Column(String, nullable=False)
    floor = Column(Float, nullable=False, default=0.0)
    vol_1day = Column(Integer, nullable=False, default=0.0)
    vol_7day = Column(Integer, nullable=False, default=0.0)
    occupier = Column(String(100), nullable=True)
    occupy_min = Column(Integer, nullable=True)
    occupy_until = Column(DateTime, nullable=True)
    lock = Column(Boolean, default=False)

start_periodic_refresh_db涉及数据库操作,它写为以下

def start_periodic_refresh_db(self):
        self.scheduler.add_job(
            func=self.refresh_db,
            trigger="interval",
            hours= self.refresh_interval_in_hr,
            id = self.refresh_db.__name__,
            replace_existing = True,
            max_instances = 1,
            misfire_grace_time = None,
            name = "refresh database by crawling",
            next_run_time = datetime.now()
        )
        print(f"refresh_db scheduled periodically, next run time: {self.next_refresh_time}")

代码,除非删除JobStores 参数,即使用默认MOMEMEJOBSTORE

为什么?我该如何使SQL Jobs Store的行为能够按预期进行?谢谢

I have a job that involves sqlite database operation and I use the app scheduler do it for me in the background. It is working fine if it uses the default memory job store but if it uses the SQLAlchemyJobStore, it throws the following error:

  File "/usr/lib/python3.8/site-packages/apscheduler/jobstores/sqlalchemy.py", line 95, in add_job
    'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.decl_api.Base'>: attribute lookup Base on sqlalchemy.orm.decl_api failed

here is how I initialise the scheduler

class DataManager:
    
    def __init__(self, periodic_refresh_db = False):
        self.data_hdl = DataHandler()
        self.db = Database(Base)
        self.scheduler = BackgroundScheduler(
            jobstores={
                'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
        self.refresh_interval_in_hr = 1
        self.scheduler.start()
        if periodic_refresh_db:
            self.start_periodic_refresh_db()
    
    def refresh_db(self):
        self.data_hdl.fetch_data()
        self.db.delete_unoccupied_nft()
        
        db_ready_data = self.data_hdl.db_ready_data
        db_ready_data = db_ready_data[~db_ready_data["slug"].isin(self.db.remain_slugs)]
        
        db_ready_data.to_sql(
            name = NftCollection.__tablename__,
            con = self.db.engine,
            if_exists="append",
            index = False
        )

NftCollection is an orm object, which is the following:

class NftCollection(Base):

    __tablename__ = "nft_collection"
    __table_args__ = {'sqlite_autoincrement': True}

    id = Column(Integer, primary_key=True, nullable=False)
    slug = Column(String(30), nullable=False)
    name = Column(String, nullable=False)
    floor = Column(Float, nullable=False, default=0.0)
    vol_1day = Column(Integer, nullable=False, default=0.0)
    vol_7day = Column(Integer, nullable=False, default=0.0)
    occupier = Column(String(100), nullable=True)
    occupy_min = Column(Integer, nullable=True)
    occupy_until = Column(DateTime, nullable=True)
    lock = Column(Boolean, default=False)

start_periodic_refresh_db involves database operation, it is written as the following

def start_periodic_refresh_db(self):
        self.scheduler.add_job(
            func=self.refresh_db,
            trigger="interval",
            hours= self.refresh_interval_in_hr,
            id = self.refresh_db.__name__,
            replace_existing = True,
            max_instances = 1,
            misfire_grace_time = None,
            name = "refresh database by crawling",
            next_run_time = datetime.now()
        )
        print(f"refresh_db scheduled periodically, next run time: {self.next_refresh_time}")

the above code will not work unless removing the jobstores argument, i.e. using the default MemoryJobStore

Why is it? and how can I make the sql jobs store behave as expected? Thank you

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

傾城如夢未必闌珊 2025-01-30 12:45:59

这是因为memoryJobstore不会腌制任何东西。引用

MemoryJobStore stores jobs in memory as-is, without serializing them. This allows you to schedule callables that are unreachable globally and use job non-serializable job arguments.

This is because MemoryJobStore does not pickle anything. Quote from the documentation:

MemoryJobStore stores jobs in memory as-is, without serializing them. This allows you to schedule callables that are unreachable globally and use job non-serializable job arguments.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文