与烧瓶施加工厂模式/枪手/nginx/主管正确设置芹菜
我的任务是更新MySQL表的每一行,但它非常慢。我很少需要这样做,只有当我改变一些基本的东西时,我认为了解多线程将是一个很大的变化。但是,所有在线示例和教程都在讨论一些事情,而不是其他事情,而我正在努力将所有信息拼凑在一起。
我知道我需要进行芹菜过程,我只是不知道我是否做对了。许多教程谈论了停靠重新的环境而不解释如何做的教程,所以我认为我会来这里进行一些真正的人与人类互动,以帮助我对此感到不那么愚蠢。到目前为止,我的代码是我的代码/werfe/__ Int __。py
,
from flask import Flask, appcontext_popped, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_migrate import Migrate
from flask_wtf import CSRFProtect
import logging
import celery
#Path Math
import sys
import os
from . import config
db:SQLAlchemy = SQLAlchemy()
migrate = Migrate()
csrf = CSRFProtect()
celery: celery.Celery
DB_NAME = "main"
def create_app(name):
#Flask Instance
app = Flask(__name__)
app.config.from_object(config.ProdTestConfig)
# logging stuff
#Database
db.init_app(app)
migrate.init_app(app, db)
csrf.init_app(app)
global celery
celery = make_celery(app)
with app.app_context():
db.create_all()
# Models and Blueprints here
from .helper_functions import migration_handling as mgh
#where you will find the thing I need to run async
app.before_first_request(mgh.run_back_check)
# log manager stuff
#error page handling
return app
def make_celery(app):
celery = celery.Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
I've read some other ways seem to fit a bit better like using:
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, result_backend=Config.RESULT_BACKEND)
然后在create_app()
中运行celery.conf.update(app.config)
。这样的问题是,我不知道如何在托管网站和我的个人Windows机器的Linode计算机上设置Redis服务器。我已经安装了redis pip。这就是我试图运行异步外观的功能:
@celery.task(name='app.tasks.campaign_pay_out_process')
def campaign_pay_out_process():
'''
Process Every Campaigns Pay
'''
campaign: Campaigns
for campaign in Campaigns.query.filter_by():
campaign.process_pay()
db.session.commit()
current_app.logger.info('Done Campaign Pay Out Processing')
我正在从主管上运行Gunicorn,因为重新启动非常容易,并且消除了我的超长linux命令的生活以开始一个过程,这是很棒的。我知道这是芹菜的命令:芹菜-A芹菜_Worker.celery Worker -pool = solo -loglevel = info
,我很想知道如何将其包括在我的工作流程中。这是我的主管配置:
[program:paymentwebapp]
directory=/home/sai/paymentWebApp
command=/home/sai/paymentWebApp/venv/bin/gunicorn --workers 1 --threads 3 wsgi:app
user=sai
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/paymentwebapp/paymentwebapp.err.log
stdout_logfile=/var/log/paymentwebapp/paymentwebapp.out.log
现在是我的烧瓶配置:
from os import environ, path
from dotenv import load_dotenv
DB_NAME = "main"
class Config:
"""Base config."""
#SESSION_COOKIE_NAME = environ.get('SESSION_COOKIE_NAME')
MAX_CONTENT_LENGTH = 16*1000*1000
RECEIPT_FOLDER = '../uploads/receipts'
IMPORT_FOLDER = 'uploads/imports'
UPLOAD_FOLDER = 'uploads'
EXPORT_FOLDER = '/uploads/exports'
UPLOAD_EXTENSIONS = ['.jpg', '.png', '.pdf', '.csv', '.xls', '.xlsx']
STATIC_FOLDER = 'static'
TEMPLATES_FOLDER = 'templates'
class ProdConfig(Config):
basedir = path.abspath(path.dirname(__file__))
load_dotenv('/home/sai/.env')
env_dict = dict(environ)
FLASK_ENV = 'production'
DEBUG = False
TESTING = False
SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
SECRET_KEY = environ.get('SECRET_KEY')
SERVER_NAME = environ.get('SERVER_NAME')
SESSION_COOKIE_SECURE = True
WTF_CSRF_TIME_LIMIT = 600
#Uploads
class DevConfig(Config):
basedir = path.abspath(path.dirname(__file__))
load_dotenv('C:\saiscripts\intercept_branch\Payment Web App Project\.env')
env_dict = dict(environ)
FLASK_ENV = 'development'
DEBUG = True
SQLALCHEMY_DATABASE_URI = environ.get('DEV_DATABASE_URI')
SECRET_KEY = environ.get('SECRET_KEY')
class ProdTestConfig(DevConfig):
'''
Developer config settings but production database server
'''
SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
if __name__ == '__main__':
print(environ.get('SQLALCHEMY_DATABASE_URI'))
这是我从教程中复制一些代码的地方,因为我应该让芹菜工人:
#!/usr/bin/env python
import os
#from app import create_app, celery
from website import create_app
app = create_app()
app.app_context().push()
from website import celery
I have a task of updating every single row of a MySQL table but it's super slow. I rarely need to do it and only when I change something fundamental, but I thought this would be a great change to learn about multi threading. However all the examples and tutorials online go over some things and not others and I'm struggling to piece all the information together.
I know I need to make a celery process I just don't know if I'm doing it right. A lot of tutorials talk about dockerizing a redis environment without explaining how to do it so I thought I'd come here for some real human-to-human interaction to maybe help me feel less stupid about this.Here's my code so far/website/__init__.py
from flask import Flask, appcontext_popped, render_template
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_migrate import Migrate
from flask_wtf import CSRFProtect
import logging
import celery
#Path Math
import sys
import os
from . import config
db:SQLAlchemy = SQLAlchemy()
migrate = Migrate()
csrf = CSRFProtect()
celery: celery.Celery
DB_NAME = "main"
def create_app(name):
#Flask Instance
app = Flask(__name__)
app.config.from_object(config.ProdTestConfig)
# logging stuff
#Database
db.init_app(app)
migrate.init_app(app, db)
csrf.init_app(app)
global celery
celery = make_celery(app)
with app.app_context():
db.create_all()
# Models and Blueprints here
from .helper_functions import migration_handling as mgh
#where you will find the thing I need to run async
app.before_first_request(mgh.run_back_check)
# log manager stuff
#error page handling
return app
def make_celery(app):
celery = celery.Celery(
app.import_name,
backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL']
)
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
I've read some other ways seem to fit a bit better like using:
celery = Celery(__name__, broker=Config.CELERY_BROKER_URL, result_backend=Config.RESULT_BACKEND)
Then in create_app()
they run celery.conf.update(app.config)
. The issue with this is that I don't know how to setup a redis server on my linode machine hosting the site and my personal windows machine. I have redis pip installed. This is how the function I'm trying to run async looks:
@celery.task(name='app.tasks.campaign_pay_out_process')
def campaign_pay_out_process():
'''
Process Every Campaigns Pay
'''
campaign: Campaigns
for campaign in Campaigns.query.filter_by():
campaign.process_pay()
db.session.commit()
current_app.logger.info('Done Campaign Pay Out Processing')
I'm running gunicorn off of supervisor because restarting is super easy and ridding my life of super long linux commands to start a process has been great. I know this is the command for celery: celery -A celery_worker.celery worker --pool=solo --loglevel=info
and I'd love to know how to include that in my work flow. Here's my supervisor config:
[program:paymentwebapp]
directory=/home/sai/paymentWebApp
command=/home/sai/paymentWebApp/venv/bin/gunicorn --workers 1 --threads 3 wsgi:app
user=sai
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stderr_logfile=/var/log/paymentwebapp/paymentwebapp.err.log
stdout_logfile=/var/log/paymentwebapp/paymentwebapp.out.log
Here's my flask config right now:
from os import environ, path
from dotenv import load_dotenv
DB_NAME = "main"
class Config:
"""Base config."""
#SESSION_COOKIE_NAME = environ.get('SESSION_COOKIE_NAME')
MAX_CONTENT_LENGTH = 16*1000*1000
RECEIPT_FOLDER = '../uploads/receipts'
IMPORT_FOLDER = 'uploads/imports'
UPLOAD_FOLDER = 'uploads'
EXPORT_FOLDER = '/uploads/exports'
UPLOAD_EXTENSIONS = ['.jpg', '.png', '.pdf', '.csv', '.xls', '.xlsx']
STATIC_FOLDER = 'static'
TEMPLATES_FOLDER = 'templates'
class ProdConfig(Config):
basedir = path.abspath(path.dirname(__file__))
load_dotenv('/home/sai/.env')
env_dict = dict(environ)
FLASK_ENV = 'production'
DEBUG = False
TESTING = False
SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
SECRET_KEY = environ.get('SECRET_KEY')
SERVER_NAME = environ.get('SERVER_NAME')
SESSION_COOKIE_SECURE = True
WTF_CSRF_TIME_LIMIT = 600
#Uploads
class DevConfig(Config):
basedir = path.abspath(path.dirname(__file__))
load_dotenv('C:\saiscripts\intercept_branch\Payment Web App Project\.env')
env_dict = dict(environ)
FLASK_ENV = 'development'
DEBUG = True
SQLALCHEMY_DATABASE_URI = environ.get('DEV_DATABASE_URI')
SECRET_KEY = environ.get('SECRET_KEY')
class ProdTestConfig(DevConfig):
'''
Developer config settings but production database server
'''
SQLALCHEMY_DATABASE_URI = environ.get('PROD_DATABASE_URI')
if __name__ == '__main__':
print(environ.get('SQLALCHEMY_DATABASE_URI'))
This is where I copied some code from a tutorial because I'm supposed to make a celery worker:
#!/usr/bin/env python
import os
#from app import create_app, celery
from website import create_app
app = create_app()
app.app_context().push()
from website import celery
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论