如何创建 Celery Windows 服务?

发布于 2025-01-07 20:22:55 字数 244 浏览 4 评论 0原文

我正在尝试创建一个 Windows 服务来启动 Celery。我遇到过一篇文章,它使用 任务计划程序。然而,它似乎启动了许多 celery 实例并不断消耗内存,直到机器死掉。有什么办法可以将其作为 Windows 服务启动吗?

I'm trying to create a Windows Service to launch Celery. I have come across an article that does it using Task Scheduler. However it seems to launch numerous celery instances and keeps eating up memory till the machine dies. Is there any way to launch it as a Windows service?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

小忆控 2025-01-14 20:22:55

我从另一个网站得到了答案。 Celeryd(Celery 的守护进程服务)作为粘贴应用程序运行,搜索“Paster Windows Service”引导我 此处。它描述了如何将 Pylons 应用程序作为 Windows 服务运行。作为 Paster 框架和托管 Python Web 服务的新手,我一开始并没有想到要检查它。但该解决方案适用于 Celery,只是在脚本中略有更改。

我修改了脚本,以便更轻松地修改 Celery 设置。基本的更改是:

  1. 使用 Celery 服务的设置创建一个 INI 文件(如下所示)
  2. 创建一个 python 脚本来创建 Windows 服务。

INI 文件设置 (celeryd.ini):

[celery:service]
service_name = CeleryService
service_display_name = Celery Service
service_description = WSCGI Windows Celery Service
service_logfile = celeryd.log

用于创建 Windows 服务 (CeleryService.py) 的 Python 脚本:

"""
The most basic (working) Windows service possible.
Requires Mark Hammond's pywin32 package.  
Most of the code was taken from a  CherryPy 2.2 example of how to set up a service
"""
import pkg_resources
import win32serviceutil
from paste.script.serve import ServeCommand as Server
import os, sys
import ConfigParser

import win32service
import win32event

SCRIPT_DIR          = os.path.abspath(os.path.dirname(__file__))
INI_FILE            = 'celeryd.ini'
SERV_SECTION        = 'celery:service'
SERV_NAME           = 'service_name'
SERV_DISPLAY_NAME   = 'service_display_name'
SERV_DESC           = 'service_description'
SERV_LOG_FILE       = 'service_logfile'
SERV_APPLICATION    = 'celeryd'
SERV_LOG_FILE_VAR   = 'CELERYD_LOG_FILE'

# Default Values
SERV_NAME_DEFAULT           = 'CeleryService'
SERV_DISPLAY_NAME_DEFAULT   = 'Celery Service'
SERV_DESC_DEFAULT           = 'WSCGI Windows Celery Service'
SERV_LOG_FILE_DEFAULT       = r'D:\logs\celery.log'

class DefaultSettings(object):
    def __init__(self):
        if SCRIPT_DIR:
            os.chdir(SCRIPT_DIR)
        # find the ini file
        self.ini = os.path.join(SCRIPT_DIR,INI_FILE)
        # create a config parser opject and populate it with the ini file
        c = ConfigParser.SafeConfigParser()
        c.read(self.ini)
        self.c = c

    def getDefaults(self):
        '''
        Check for and get the default settings
        '''
        if (
            (not self.c.has_section(SERV_SECTION)) or
            (not self.c.has_option(SERV_SECTION, SERV_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DISPLAY_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DESC)) or
            (not self.c.has_option(SERV_SECTION, SERV_LOG_FILE))
            ):
            print 'setting defaults'
            self.setDefaults()
        service_name = self.c.get(SERV_SECTION, SERV_NAME)
        service_display_name = self.c.get(SERV_SECTION, SERV_DISPLAY_NAME)
        service_description = self.c.get(SERV_SECTION, SERV_DESC)
        iniFile = self.ini
        service_logfile = self.c.get(SERV_SECTION, SERV_LOG_FILE)
        return service_name, service_display_name, service_description, iniFile, service_logfile

    def setDefaults(self):
        '''
        set and add the default setting to the ini file
        '''
        if not self.c.has_section(SERV_SECTION):
            self.c.add_section(SERV_SECTION)
        self.c.set(SERV_SECTION, SERV_NAME, SERV_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DISPLAY_NAME, SERV_DISPLAY_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DESC, SERV_DESC_DEFAULT)
        self.c.set(SERV_SECTION, SERV_LOG_FILE, SERV_LOG_FILE_DEFAULT)
        cfg = file(self.ini, 'wr')
        self.c.write(cfg)
        cfg.close()
        print '''
you must set the celery:service section service_name, service_display_name,
and service_description options to define the service 
in the %s file
''' % self.ini
        sys.exit()


class CeleryService(win32serviceutil.ServiceFramework):
    """NT Service."""

    d = DefaultSettings()
    service_name, service_display_name, service_description, iniFile, logFile = d.getDefaults()

    _svc_name_ = service_name
    _svc_display_name_ = service_display_name
    _svc_description_ = service_description

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        # create an event that SvcDoRun can wait on and SvcStop
        # can set.
        self.stop_event = win32event.CreateEvent(None, 0, 0, None)

    def SvcDoRun(self):
        os.chdir(SCRIPT_DIR)
        s = Server(SERV_APPLICATION)
        os.environ[SERV_LOG_FILE_VAR] = self.logFile
        s.run([self.iniFile])
        win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE)

    def SvcStop(self):
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        #win32event.SetEvent(self.stop_event)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()

if __name__ == '__main__':
    win32serviceutil.HandleCommandLine(CeleryService)

要安装服务,请运行 python CeleryService.py install,然后运行 ​​python CeleryService.py start 启动服务。 注意:这些命令应在具有管理员权限的命令行中运行。

如果需要删除服务,请运行python CeleryService.py remove

我试图托管 Celery 作为增强 RhodeCode 安装的一部分。这个解决方案似乎有效。希望这会对某人有所帮助。

I got the answer from another website. Celeryd (daemon service for Celery) runs as a paster application, searching for 'Paster Windows Service' lead me here. It describes how to run a Pylons application as a Windows Service. Being new to paster framework and hosting python web services, it didn't cross my mind to check it at first. But that solution works for Celery with a slight change here and there in the script.

I've modified the script to make it easier for modifying Celery settings. The essential changes are:

  1. Create an INI file with the settings for Celery service (shown below)
  2. Create a python script to create a Windows service.

INI file settings (celeryd.ini):

[celery:service]
service_name = CeleryService
service_display_name = Celery Service
service_description = WSCGI Windows Celery Service
service_logfile = celeryd.log

Python script to create Windows Service (CeleryService.py):

"""
The most basic (working) Windows service possible.
Requires Mark Hammond's pywin32 package.  
Most of the code was taken from a  CherryPy 2.2 example of how to set up a service
"""
import pkg_resources
import win32serviceutil
from paste.script.serve import ServeCommand as Server
import os, sys
import ConfigParser

import win32service
import win32event

SCRIPT_DIR          = os.path.abspath(os.path.dirname(__file__))
INI_FILE            = 'celeryd.ini'
SERV_SECTION        = 'celery:service'
SERV_NAME           = 'service_name'
SERV_DISPLAY_NAME   = 'service_display_name'
SERV_DESC           = 'service_description'
SERV_LOG_FILE       = 'service_logfile'
SERV_APPLICATION    = 'celeryd'
SERV_LOG_FILE_VAR   = 'CELERYD_LOG_FILE'

# Default Values
SERV_NAME_DEFAULT           = 'CeleryService'
SERV_DISPLAY_NAME_DEFAULT   = 'Celery Service'
SERV_DESC_DEFAULT           = 'WSCGI Windows Celery Service'
SERV_LOG_FILE_DEFAULT       = r'D:\logs\celery.log'

class DefaultSettings(object):
    def __init__(self):
        if SCRIPT_DIR:
            os.chdir(SCRIPT_DIR)
        # find the ini file
        self.ini = os.path.join(SCRIPT_DIR,INI_FILE)
        # create a config parser opject and populate it with the ini file
        c = ConfigParser.SafeConfigParser()
        c.read(self.ini)
        self.c = c

    def getDefaults(self):
        '''
        Check for and get the default settings
        '''
        if (
            (not self.c.has_section(SERV_SECTION)) or
            (not self.c.has_option(SERV_SECTION, SERV_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DISPLAY_NAME)) or
            (not self.c.has_option(SERV_SECTION, SERV_DESC)) or
            (not self.c.has_option(SERV_SECTION, SERV_LOG_FILE))
            ):
            print 'setting defaults'
            self.setDefaults()
        service_name = self.c.get(SERV_SECTION, SERV_NAME)
        service_display_name = self.c.get(SERV_SECTION, SERV_DISPLAY_NAME)
        service_description = self.c.get(SERV_SECTION, SERV_DESC)
        iniFile = self.ini
        service_logfile = self.c.get(SERV_SECTION, SERV_LOG_FILE)
        return service_name, service_display_name, service_description, iniFile, service_logfile

    def setDefaults(self):
        '''
        set and add the default setting to the ini file
        '''
        if not self.c.has_section(SERV_SECTION):
            self.c.add_section(SERV_SECTION)
        self.c.set(SERV_SECTION, SERV_NAME, SERV_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DISPLAY_NAME, SERV_DISPLAY_NAME_DEFAULT)
        self.c.set(SERV_SECTION, SERV_DESC, SERV_DESC_DEFAULT)
        self.c.set(SERV_SECTION, SERV_LOG_FILE, SERV_LOG_FILE_DEFAULT)
        cfg = file(self.ini, 'wr')
        self.c.write(cfg)
        cfg.close()
        print '''
you must set the celery:service section service_name, service_display_name,
and service_description options to define the service 
in the %s file
''' % self.ini
        sys.exit()


class CeleryService(win32serviceutil.ServiceFramework):
    """NT Service."""

    d = DefaultSettings()
    service_name, service_display_name, service_description, iniFile, logFile = d.getDefaults()

    _svc_name_ = service_name
    _svc_display_name_ = service_display_name
    _svc_description_ = service_description

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        # create an event that SvcDoRun can wait on and SvcStop
        # can set.
        self.stop_event = win32event.CreateEvent(None, 0, 0, None)

    def SvcDoRun(self):
        os.chdir(SCRIPT_DIR)
        s = Server(SERV_APPLICATION)
        os.environ[SERV_LOG_FILE_VAR] = self.logFile
        s.run([self.iniFile])
        win32event.WaitForSingleObject(self.stop_event, win32event.INFINITE)

    def SvcStop(self):
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        #win32event.SetEvent(self.stop_event)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()

if __name__ == '__main__':
    win32serviceutil.HandleCommandLine(CeleryService)

To install the service run python CeleryService.py install and then python CeleryService.py start to start the service. NOTE: These commands should be run in command-line with administrator rights.

If the service needs to be removed, run python CeleryService.py remove.

I was trying to host Celery as part of enhancing my RhodeCode installation. This solution seems to work. Hope this will help someone.

土豪我们做朋友吧 2025-01-14 20:22:55

接受的答案不适用于使用 Django 应用程序运行 celery。但它启发我想出了一个使用 Django 将 celery 作为 Windows 服务运行的解决方案。请注意,以下内容仅适用于 Django 项目。经过一些修改,它可以与其他应用程序一起使用。

以下讨论假设已安装 Python >= 3.6 和 RabbitMQ,并且 rabbitmq-server 正在 localhost 上运行。

在 Django 项目的顶级文件夹中创建一个文件 celery_service.py (或任何您喜欢的文件),与 manage.py 相同级别,并包含以下内容:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
from pathlib import Path
import shlex
import logging
import time

# The directory for celery.log and celery_service.log
# Default: the directory of this script
INSTDIR = Path(__file__).parent
# The path of python Scripts
# Usually it is in path_to/venv/Scripts.
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = INSTDIR / 'venvcelery/Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'proj'

logging.basicConfig(
    filename = INSTDIR / 'celery_service.log',
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "Celery"
    _svc_display_name_ = "Celery Distributed Task Queue Service"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)           

    def SvcStop(self):
        logging.info('Stopping {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()           

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} worker -f "{log_path}" -l info -P eventlet'.format(
            celery_path=PYTHONSCRIPTPATH / 'celery.exe',
            proj_dir=PROJECTDIR,
            log_path=INSTDIR / 'celery.log')
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)

在运行脚本之前,您需要

  1. 可以选择创建一个 python 虚拟文件环境,例如“venvcelery”。

  2. 安装以下要求:

    django>=2.0.0
    sqlalchemy>=1.0.14
    芹菜>=4.3.0,<5.0
    pywin32>=227
    eventlet>=0.25

  3. 修复 pywin32 pywintypes36.dll 位置。 ref

  4. 在 celery_service.py 中正确设置 PYTHONSCRIPTPATH 和 PROJECTDIR

PYTHONSCRIPTPATH 是通常是Python安装路径下的“Scripts”文件夹或当前虚拟环境

PROJECTDIR是 Django 项目的目录名称。

是manage.py的同级目录,不是父目录。

现在您可以安装/启动/停止/删除服务:

python celery_service.py install
python celery_service.py start
python celery_service.py stop
python celery_service.py remove

我创建了一个演示 Django 项目,其中 celery 作为 Windows 服务运行:

https://github.com/azalea/django_celery_windows_service

如果您对运行示例感兴趣。


注意:这是假设 Python >= 3.6、Django 2.2 和 Celery 4 的更新版本

。可以在编辑历史记录中查看包含 Python 2.7、Django 1.6 和 Celery 3 的旧版本。

The accepted answer does not apply for running celery with a Django application. But it inspired me to come up with a solution for running celery as a Windows service with Django. Note that the following is for Django projects only. It may work with other applications with some modifications.

The following discussion assumes Python >= 3.6 and RabbitMQ are already installed, and rabbitmq-server is running on localhost.

Create a file celery_service.py (or whatever you like) inside your Django project's top level folder, same level as manage.py, with the following content:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
from pathlib import Path
import shlex
import logging
import time

# The directory for celery.log and celery_service.log
# Default: the directory of this script
INSTDIR = Path(__file__).parent
# The path of python Scripts
# Usually it is in path_to/venv/Scripts.
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = INSTDIR / 'venvcelery/Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'proj'

logging.basicConfig(
    filename = INSTDIR / 'celery_service.log',
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "Celery"
    _svc_display_name_ = "Celery Distributed Task Queue Service"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)           

    def SvcStop(self):
        logging.info('Stopping {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()           

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} worker -f "{log_path}" -l info -P eventlet'.format(
            celery_path=PYTHONSCRIPTPATH / 'celery.exe',
            proj_dir=PROJECTDIR,
            log_path=INSTDIR / 'celery.log')
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)

Before the script can be run, you need to

  1. Optionally create a python virtual environment e.g. 'venvcelery'.

  2. Install the following requirements:

    django>=2.0.0
    sqlalchemy>=1.0.14
    celery>=4.3.0,<5.0
    pywin32>=227
    eventlet>=0.25

  3. Fix pywin32 pywintypes36.dll location. ref

  4. Correctly set PYTHONSCRIPTPATH and PROJECTDIR in celery_service.py

PYTHONSCRIPTPATH is usually the "Scripts" folder under your python's installation path or current virtual environment

PROJECTDIR is the directory name of the Django project.

It is the directory at the same level of manage.py, not the parent directory.

Now you can install / start / stop / remove the service with:

python celery_service.py install
python celery_service.py start
python celery_service.py stop
python celery_service.py remove

I created a demo Django project with celery running as a Windows service:

https://github.com/azalea/django_celery_windows_service

In case you are interested in a running example.


Note: this is an updated version assuming Python >= 3.6, Django 2.2 and Celery 4.

An older version with Python 2.7, Django 1.6 and Celery 3 can be viewed in edit history.

温柔戏命师 2025-01-14 20:22:55

@azalea 的回答对我帮助很大,但我想在这里强调的一件事是,服务(celery_service.py)需要使用您的用户/密码进行安装,否则,当您运行 subprocess.Popen(args )在 SvcDoRun() 函数中,不会发生任何事情,因为会出现权限问题。要设置用户/密码,您可以选择以下两种方法之一:

  1. 使用命令行:

    python33 .\celeryService1.py --用户名 .\用户名 --密码 密码
    
  2. 转到计算机管理(本地)>服务和应用>服务,找到您的服务器(在@azalea的示例中,它是“Celery Distributed Task Queue Service”),右键单击打开“属性”页面,在“登录”选项卡中输入“此帐户”

@azalea 's answer helped me a lot, but one thing I like to highlight here is, the service (celery_service.py) needs to be installed with your user/password, otherwise, when you run subprocess.Popen(args) in SvcDoRun() function, nothing will happen as there will be a permission issue. To set the user/password, you can choose one of two methods:

  1. Using command line:

    python33 .\celeryService1.py --username .\USERNAME --password PASSWORD
    
  2. Go to Computer Management(local) > Services and Applications > Services, find your server (in @azalea's example, it is "Celery Distributed Task Queue Service"), and right click to open Properties page, input "This account" in Log On tab

°如果伤别离去 2025-01-14 20:22:55

这里有一个很好的项目,但没有成功使用它:
链接到 django-windows-tools 的 GitHub
它在最后一个命令行给了我一个超时。没有足够的时间去寻找原因。

该包允许在 IIS 上设置 Django 项目的 FastCGI、Celery 和静态文件。

A good project here but didn't succeed to use it :
Link to the GitHub of the django-windows-tools.
It gave me a timeout at the last command line. Doesn't have enough time to search why.

The package allow settings FastCGI, Celery and Static files of a Django project on IIS.

只怪假的太真实 2025-01-14 20:22:55

感谢 Azalea,这让我走上了能够在 Windows 上使用 Celery 4 创建 2 个 Windows 服务的道路。

一个能够启动/停止多个工人 T
其次,能够启动/停止beat服务并使用Celery 4整理pid。

唯一需要注意的是,我没有解决方案,你不能重新启动worker,因为你需要确保多个进程的生成在开始备份之前停止。

Workers.py:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time

# The directory for celery_worker.log and celery_worker_service.log
# Default: the directory of this script
INSTDIR = 'X:\Application\Project'
LOGDIR = 'X:\Application\LogFiles'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:\Python27\Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:\Python36\Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'Project'

logging.basicConfig(
    filename = os.path.join(LOGDIR, 'celery_worker_service.log'),
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "CeleryWorkers"
    _svc_display_name_ = "CeleryWorkers"

    def __init__(self, args): 
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)

    def SvcStop(self):
        logging.info('Stopping {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        logging.info('Stopped1 {name} service ...'.format(name=self._svc_name_))   
        logging.info('Stopped3 {name} service ...'.format(name=self._svc_name_)) 
        command = '"{celery_path}" -A {proj_dir} --workdir=X:/Application/Project control shutdown --timeout=10'.format(
        celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
        proj_dir=PROJECTDIR,
        log_path=os.path.join(LOGDIR,'celery_worker.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('Stopped celery shutdown  ...') 
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        logging.info('Stopped2 {name} service ...'.format(name=self._svc_name_))  
        sys.exit()           

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} -c 8 worker --workdir=X:/Application/Project --pidfile=celeryservice.pid  -f "{log_path}" -l info'.format(
            celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
            proj_dir=PROJECTDIR,
            log_path=os.path.join(LOGDIR,'celery_worker.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)

Beatservice.py:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time
import signal

# The directory for celery_beat.log and celery_beat_service.log
# Default: the directory of this script
INSTDIR = os.path.dirname(os.path.realpath(__file__))
LOGPATH = 'X:\Application\Logs'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:\Python27\Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:\Python36\Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'PROJECT'

logging.basicConfig(
    filename = os.path.join(LOGPATH, 'celery_beat_service.log'),
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "CeleryBeat"
    _svc_display_name_ = "CeleryBeat"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)           

    def SvcStop(self):
        logging.info('Stopping 1 {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        pidno = open("X:\Aplication\Project\celerybeat.pid", "r")
        _pid_id_ = pidid=pidno.read()
        pidno.close()
        logging.info(_pid_id_)
        logging.info('taskkill /F /PID {pidid} ..'.format(pidid=_pid_id_))
        cmdcom = 'taskkill /F /PID {pidid}'.format(pidid=_pid_id_)
        logging.info(cmdcom)
        killargs = shlex.split(cmdcom)
        process = subprocess.Popen(killargs)
        output, error = process.communicate()
        logging.info(output)
        logging.info('Stopping 2 {name} service ...'.format(name=self._svc_name_))
        os.remove("X:\Application\PROJECT\celerybeat.pid")
        logging.info('X:\Application\PROJECT\celerybeat.pid  file removed')
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} beat --workdir=X:/Application/Project -f X:/Application/logs/beat.log -l info'.format(
            celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
            proj_dir=PROJECTDIR,
            log_path=os.path.join(LOGPATH,'celery_beat.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)

Thanks to Azalea as this led me to the path in being able to create 2 windows services with Celery 4 on Windows.

One to be able to start/stop multiple workers T
Second to be able to start/stop the beat service and tidy up the pid using Celery 4.

Only caveat in this that I do not have a solution for is you can not restart the workers as you need to ensure the spawned processes for multiple are stopped before starting back up.

Workers.py:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time

# The directory for celery_worker.log and celery_worker_service.log
# Default: the directory of this script
INSTDIR = 'X:\Application\Project'
LOGDIR = 'X:\Application\LogFiles'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:\Python27\Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:\Python36\Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'Project'

logging.basicConfig(
    filename = os.path.join(LOGDIR, 'celery_worker_service.log'),
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "CeleryWorkers"
    _svc_display_name_ = "CeleryWorkers"

    def __init__(self, args): 
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)

    def SvcStop(self):
        logging.info('Stopping {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        logging.info('Stopped1 {name} service ...'.format(name=self._svc_name_))   
        logging.info('Stopped3 {name} service ...'.format(name=self._svc_name_)) 
        command = '"{celery_path}" -A {proj_dir} --workdir=X:/Application/Project control shutdown --timeout=10'.format(
        celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
        proj_dir=PROJECTDIR,
        log_path=os.path.join(LOGDIR,'celery_worker.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('Stopped celery shutdown  ...') 
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        logging.info('Stopped2 {name} service ...'.format(name=self._svc_name_))  
        sys.exit()           

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} -c 8 worker --workdir=X:/Application/Project --pidfile=celeryservice.pid  -f "{log_path}" -l info'.format(
            celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
            proj_dir=PROJECTDIR,
            log_path=os.path.join(LOGDIR,'celery_worker.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)

Beatservice.py:

'''Usage : python celery_service.py install (start / stop / remove)
Run celery as a Windows service
'''
import win32service
import win32serviceutil
import win32api
import win32con
import win32event
import subprocess
import sys
import os
import shlex
import logging
import time
import signal

# The directory for celery_beat.log and celery_beat_service.log
# Default: the directory of this script
INSTDIR = os.path.dirname(os.path.realpath(__file__))
LOGPATH = 'X:\Application\Logs'
# The path of python Scripts
# Usually it is in PYTHON_INSTALL_DIR/Scripts. e.g.
# r'C:\Python27\Scripts'
# If it is already in system PATH, then it can be set as ''
PYTHONSCRIPTPATH = 'C:\Python36\Scripts'
# The directory name of django project
# Note: it is the directory at the same level of manage.py
# not the parent directory
PROJECTDIR = 'PROJECT'

logging.basicConfig(
    filename = os.path.join(LOGPATH, 'celery_beat_service.log'),
    level = logging.DEBUG, 
    format = '[%(asctime)-15s: %(levelname)-7.7s] %(message)s'
)

class CeleryService(win32serviceutil.ServiceFramework):

    _svc_name_ = "CeleryBeat"
    _svc_display_name_ = "CeleryBeat"

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)           

    def SvcStop(self):
        logging.info('Stopping 1 {name} service ...'.format(name=self._svc_name_))        
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        pidno = open("X:\Aplication\Project\celerybeat.pid", "r")
        _pid_id_ = pidid=pidno.read()
        pidno.close()
        logging.info(_pid_id_)
        logging.info('taskkill /F /PID {pidid} ..'.format(pidid=_pid_id_))
        cmdcom = 'taskkill /F /PID {pidid}'.format(pidid=_pid_id_)
        logging.info(cmdcom)
        killargs = shlex.split(cmdcom)
        process = subprocess.Popen(killargs)
        output, error = process.communicate()
        logging.info(output)
        logging.info('Stopping 2 {name} service ...'.format(name=self._svc_name_))
        os.remove("X:\Application\PROJECT\celerybeat.pid")
        logging.info('X:\Application\PROJECT\celerybeat.pid  file removed')
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        sys.exit()

    def SvcDoRun(self):
        logging.info('Starting {name} service ...'.format(name=self._svc_name_))
        os.chdir(INSTDIR) # so that proj worker can be found
        logging.info('cwd: ' + os.getcwd())
        self.ReportServiceStatus(win32service.SERVICE_RUNNING)
        command = '"{celery_path}" -A {proj_dir} beat --workdir=X:/Application/Project -f X:/Application/logs/beat.log -l info'.format(
            celery_path=os.path.join(PYTHONSCRIPTPATH, 'celery.exe'),
            proj_dir=PROJECTDIR,
            log_path=os.path.join(LOGPATH,'celery_beat.log'))
        logging.info('command: ' + command)
        args = shlex.split(command)
        proc = subprocess.Popen(args)
        logging.info('pid: {pid}'.format(pid=proc.pid))
        self.timeout = 3000
        while True:
            rc = win32event.WaitForSingleObject(self.hWaitStop, self.timeout)
            if rc == win32event.WAIT_OBJECT_0:
                # stop signal encountered
                # terminate process 'proc'
                PROCESS_TERMINATE = 1
                handle = win32api.OpenProcess(PROCESS_TERMINATE, False, proc.pid)
                win32api.TerminateProcess(handle, -1)
                win32api.CloseHandle(handle)                
                break

if __name__ == '__main__':
   win32serviceutil.HandleCommandLine(CeleryService)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文