与烧瓶施加工厂模式/枪手/nginx/主管正确设置芹菜

发布于 2025-01-22 17:15:04 字数 5043 浏览 6 评论 0原文

我的任务是更新MySQL表的每一行,但它非常慢。我很少需要这样做,只有当我改变一些基本的东西时,我认为了解多线程将是一个很大的变化。但是,所有在线示例和教程都在讨论一些事情,而不是其他事情,而我正在努力将所有信息拼凑在一起。
我知道我需要进行芹菜过程,我只是不知道我是否做对了。许多教程谈论了停靠重新的环境而不解释如何做的教程,所以我认为我会来这里进行一些真正的人与人类互动,以帮助我对此感到不那么愚蠢。到目前为止,我的代码是我的代码

/werfe/__ Int __。py

from flask import Flask, appcontext_popped, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_migrate import Migrate
from flask_wtf import CSRFProtect
import logging

import celery

#Path Math
import sys
import os
from . import config

db:SQLAlchemy = SQLAlchemy()
migrate = Migrate()
csrf = CSRFProtect()
celery: celery.Celery

DB_NAME = "main"

def create_app(name):
    #Flask Instance
    app = Flask(__name__)
    app.config.from_object(config.ProdTestConfig)
    
    # logging stuff

    #Database
    db.init_app(app)
    migrate.init_app(app, db)
    csrf.init_app(app)
    global celery
    celery = make_celery(app)
    
    with app.app_context():
        db.create_all()

        # Models and Blueprints here

        from .helper_functions import migration_handling as mgh
        #where you will find the thing I need to run async
        app.before_first_request(mgh.run_back_check)

        # log manager stuff

    #error page handling

    return app


def make_celery(app):
    celery = celery.Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

I've read some other ways seem to fit a bit better like using:
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, result_backend=Config.RESULT_BACKEND)

然后在create_app()中运行celery.conf.update(app.config)。这样的问题是,我不知道如何在托管网站和我的个人Windows机器的Linode计算机上设置Redis服务器。我已经安装了redis pip。这就是我试图运行异步外观的功能:

@celery.task(name='app.tasks.campaign_pay_out_process')
def campaign_pay_out_process():
    '''
    Process Every Campaigns Pay
    '''
    campaign: Campaigns
    for campaign in Campaigns.query.filter_by():
        campaign.process_pay()
    db.session.commit()
    current_app.logger.info('Done Campaign Pay Out Processing')

我正在从主管上运行Gunicorn,因为重新启动非常容易,并且消除了我的超长linux命令的生活以开始一个过程,这是很棒的。我知道这是芹菜的命令:芹菜-A芹菜_Worker.celery Worker -pool = solo -loglevel = info,我很想知道如何将其包括在我的工作流程中。这是我的主管配置:

[program:paymentwebapp]
directory=/home/sai/paymentWebApp
command=/home/sai/paymentWebApp/venv/bin/gunicorn --workers 1 --threads 3  wsgi:app
user=sai
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/paymentwebapp/paymentwebapp.err.log
stdout_logfile=/var/log/paymentwebapp/paymentwebapp.out.log

现在是我的烧瓶配置:

from os import environ, path
from dotenv import load_dotenv

DB_NAME = "main"

class Config:
    """Base config."""
    #SESSION_COOKIE_NAME = environ.get('SESSION_COOKIE_NAME')
    MAX_CONTENT_LENGTH = 16*1000*1000
    RECEIPT_FOLDER = '../uploads/receipts'
    IMPORT_FOLDER = 'uploads/imports'
    UPLOAD_FOLDER = 'uploads'
    EXPORT_FOLDER = '/uploads/exports'
    UPLOAD_EXTENSIONS = ['.jpg', '.png', '.pdf', '.csv', '.xls', '.xlsx']
    STATIC_FOLDER = 'static'
    TEMPLATES_FOLDER = 'templates'

class ProdConfig(Config):
    basedir = path.abspath(path.dirname(__file__))
    load_dotenv('/home/sai/.env')
    env_dict = dict(environ)
    FLASK_ENV = 'production'
    DEBUG = False
    TESTING = False
    SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
    SECRET_KEY = environ.get('SECRET_KEY')
    SERVER_NAME = environ.get('SERVER_NAME')
    SESSION_COOKIE_SECURE = True
    WTF_CSRF_TIME_LIMIT = 600

    #Uploads


class DevConfig(Config):
    basedir = path.abspath(path.dirname(__file__))
    load_dotenv('C:\saiscripts\intercept_branch\Payment Web App Project\.env')
    env_dict = dict(environ)
    FLASK_ENV = 'development'
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = environ.get('DEV_DATABASE_URI')
    SECRET_KEY = environ.get('SECRET_KEY')
    
class ProdTestConfig(DevConfig):
    '''
    Developer config settings but production database server
    '''
    SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')

if __name__ == '__main__':
    print(environ.get('SQLALCHEMY_DATABASE_URI'))

这是我从教程中复制一些代码的地方,因为我应该让芹菜工人:

#!/usr/bin/env python
import os
#from app import create_app, celery
from website import create_app

app = create_app()
app.app_context().push()

from website import celery

I have a task of updating every single row of a MySQL table but it's super slow. I rarely need to do it and only when I change something fundamental, but I thought this would be a great change to learn about multi threading. However all the examples and tutorials online go over some things and not others and I'm struggling to piece all the information together.
I know I need to make a celery process I just don't know if I'm doing it right. A lot of tutorials talk about dockerizing a redis environment without explaining how to do it so I thought I'd come here for some real human-to-human interaction to maybe help me feel less stupid about this.Here's my code so far

/website/__init__.py

from flask import Flask, appcontext_popped, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_migrate import Migrate
from flask_wtf import CSRFProtect
import logging

import celery

#Path Math
import sys
import os
from . import config

db:SQLAlchemy = SQLAlchemy()
migrate = Migrate()
csrf = CSRFProtect()
celery: celery.Celery

DB_NAME = "main"

def create_app(name):
    #Flask Instance
    app = Flask(__name__)
    app.config.from_object(config.ProdTestConfig)
    
    # logging stuff

    #Database
    db.init_app(app)
    migrate.init_app(app, db)
    csrf.init_app(app)
    global celery
    celery = make_celery(app)
    
    with app.app_context():
        db.create_all()

        # Models and Blueprints here

        from .helper_functions import migration_handling as mgh
        #where you will find the thing I need to run async
        app.before_first_request(mgh.run_back_check)

        # log manager stuff

    #error page handling

    return app


def make_celery(app):
    celery = celery.Celery(
        app.import_name,
        backend=app.config['CELERY_RESULT_BACKEND'],
        broker=app.config['CELERY_BROKER_URL']
    )
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

I've read some other ways seem to fit a bit better like using:

celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, result_backend=Config.RESULT_BACKEND)

Then in create_app() they run celery.conf.update(app.config). The issue with this is that I don't know how to setup a redis server on my linode machine hosting the site and my personal windows machine. I have redis pip installed. This is how the function I'm trying to run async looks:

@celery.task(name='app.tasks.campaign_pay_out_process')
def campaign_pay_out_process():
    '''
    Process Every Campaigns Pay
    '''
    campaign: Campaigns
    for campaign in Campaigns.query.filter_by():
        campaign.process_pay()
    db.session.commit()
    current_app.logger.info('Done Campaign Pay Out Processing')

I'm running gunicorn off of supervisor because restarting is super easy and ridding my life of super long linux commands to start a process has been great. I know this is the command for celery: celery -A celery_worker.celery worker --pool=solo --loglevel=info and I'd love to know how to include that in my work flow. Here's my supervisor config:

[program:paymentwebapp]
directory=/home/sai/paymentWebApp
command=/home/sai/paymentWebApp/venv/bin/gunicorn --workers 1 --threads 3  wsgi:app
user=sai
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/paymentwebapp/paymentwebapp.err.log
stdout_logfile=/var/log/paymentwebapp/paymentwebapp.out.log

Here's my flask config right now:

from os import environ, path
from dotenv import load_dotenv

DB_NAME = "main"

class Config:
    """Base config."""
    #SESSION_COOKIE_NAME = environ.get('SESSION_COOKIE_NAME')
    MAX_CONTENT_LENGTH = 16*1000*1000
    RECEIPT_FOLDER = '../uploads/receipts'
    IMPORT_FOLDER = 'uploads/imports'
    UPLOAD_FOLDER = 'uploads'
    EXPORT_FOLDER = '/uploads/exports'
    UPLOAD_EXTENSIONS = ['.jpg', '.png', '.pdf', '.csv', '.xls', '.xlsx']
    STATIC_FOLDER = 'static'
    TEMPLATES_FOLDER = 'templates'

class ProdConfig(Config):
    basedir = path.abspath(path.dirname(__file__))
    load_dotenv('/home/sai/.env')
    env_dict = dict(environ)
    FLASK_ENV = 'production'
    DEBUG = False
    TESTING = False
    SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
    SECRET_KEY = environ.get('SECRET_KEY')
    SERVER_NAME = environ.get('SERVER_NAME')
    SESSION_COOKIE_SECURE = True
    WTF_CSRF_TIME_LIMIT = 600

    #Uploads


class DevConfig(Config):
    basedir = path.abspath(path.dirname(__file__))
    load_dotenv('C:\saiscripts\intercept_branch\Payment Web App Project\.env')
    env_dict = dict(environ)
    FLASK_ENV = 'development'
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = environ.get('DEV_DATABASE_URI')
    SECRET_KEY = environ.get('SECRET_KEY')
    
class ProdTestConfig(DevConfig):
    '''
    Developer config settings but production database server
    '''
    SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')

if __name__ == '__main__':
    print(environ.get('SQLALCHEMY_DATABASE_URI'))

This is where I copied some code from a tutorial because I'm supposed to make a celery worker:

#!/usr/bin/env python
import os
#from app import create_app, celery
from website import create_app

app = create_app()
app.app_context().push()

from website import celery

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文