使用信号和 Celery-django 的 Django 工作流引擎

发布于 2024-10-11 00:25:00 字数 4494 浏览 5 评论 0原文

我正在努力在我的项目中实现工作流引擎,我尝试的主要目的是创建一个便携式应用程序。我将来可以将某些东西放入任何其他项目中,然后将工作流程附加到我的项目中的不同模型并使其正常工作。

我试图想出一种方法,但它似乎不是完美的设置。我想在我的项目中创建一个工作流应用程序,附加两种模型,一些模型将包含工作流的设置(工作流、步骤、操作),其他模型将包含实例/事务。

下面是我的工作流程/models.py

from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes import generic
import signals


################################
#   Workflow engine models
################################

class Signal_Listener(models.Model):
    LISTENING_ON = (
    ('I', 'INSERT'),
    ('U', 'UPDATE'),
    ('D', 'DELETE'),
    ('S', 'SELECT'),
    ('A', 'ANY QUERY'),
    ('L', 'ANY DDL'),
    )

    table_name = models.CharField(max_length=100)
    listening_to = models.CharField(max_length=1, choices=LISTENING_ON)

    class Meta:
        unique_together = (("table_name", "listening_to"),)

    def __unicode__(self):
        return '%s - %s' % (self.table_name, self.listening_to)

class Action(models.Model):
    ACTION_TYPE_CHOICES = (
    ('P', 'Python Script'   ),
    ('C', 'Class name'      ),
    )
    name = models.CharField(max_length=100)
    action_type = models.CharField(max_length=1, choices=ACTION_TYPE_CHOICES)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

class Steps(models.Model):
    sequence = models.IntegerField() 
    Action = models.ForeignKey(Action)
    Signal_Listener = models.ForeignKey(Signal_Listener)

class Process(models.Model):
## TODO: Document
# Processes class is used to store information about the process itself.
# Or in another word, the workflow name.
    WF_TYPE_LIST = (
    ('P', 'Python-API'),
    )

    name = models.CharField(max_length=30)
    is_active = models.BooleanField()
    wf_type = models.CharField(max_length=1, choices=WF_TYPE_LIST)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)
    listening_to = models.ForeignKey(Steps)



################################
#   Workflow transactions models
################################

class Instance(models.Model):
##  TODO: Document
# Re
    INSTANCE_STATUS = (
    ('I', 'In Progress' ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'     ), # Old completed tasks can be archived
    ('P', 'Pending'     ),
    ('O', 'Completed'   )
    )

    id = models.CharField(max_length=200, primary_key=True)
    status = models.CharField(max_length=1, choices=INSTANCE_STATUS, db_index=True)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

    def save(self, *args, **kwargs):
    # on new records generate a new uuid
        if self.id is None or self.id.__len__() is 0:
            import uuid

            self.id = uuid.uuid4().__str__()
        super(Instances, self).save(*args, **kwargs)

class Task(models.Model):
    TASK_STATUS = (
    ('S', 'Assigned'    ),
    ('I', 'In Progress' ),
    ('P', 'Pending'     ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'    ), # Old completed tasks can be archived
    ('O', 'Completed'   )
    )
    name = models.CharField(max_length=100)
    instance = models.ForeignKey(Instance)
    status = models.CharField(max_length=1, choices=TASK_STATUS)
    bio = models.CharField(max_length=100)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

,我还有一个工作流程/signals.py

    """
        Workflow Signals
            Will be used to monitor all inserts, update, delete or select statements
            If an action is attached to that particular table, it will be inserted Celery-Tasks distribution.
    """
    from django.db.models.signals import post_save, post_delete
    from django.core.cache import cache


    def workflow_post_init_listener(sender, **kwargs):
        try:

            if cache.get('wf_listner_cache_%s' % kwargs['instance']._meta.db_table):
                pass
            else:
                record = 'Signal_Listener'.objects.get(table_name__exact=kwargs['instance']._meta.db_table)
# am not sure what to do next!
        except 'Signal_Listener'.DoesNotExist:
            # TODO: Error logging
            pass

    post_save.connect(workflow_post_init_listener, dispatch_uid="workflow.models.listener.save")

,我认为我的模型设计可能也需要增强。我可以在多种场景中使用,并且我正在考虑从批准周期开始,例如,我可以在 signal_listener 中插入 table_name/model_name 来监视新插入。如果是这样,我将触发特定的工作流程。

至于行动,我确实理解需要制定行动。也许我需要在工作流程应用程序下创建一个操作文件夹,将每个操作放在一个类中。每个人都会执行发送电子邮件、存档、更新数据库值等特定任务。

如果我重新发明轮子,如果已经开发出这样的东西,任何人都可以建议,我会非常乐意完成它。

此致,

I'm working to implement a workflow engine into my project and the main purpose of my attempt is to create a portable app. Something, I would be able to place into any other project in the future and then attach workflows to different models in my projects and get it to work.

I tried to think of an approach but it doesn't seem to be the perfect setup. I thought of creating an workflow app inside my project, attach two kind of models, some will contain the setup of a the workflow (workflow, steps, actions) and other models will contain the instances/transactions.

Below is my workflow/models.py

from django.db import models
from django.contrib.contenttypes.models import ContentType
from django.contrib.contenttypes import generic
import signals


################################
#   Workflow engine models
################################

class Signal_Listener(models.Model):
    LISTENING_ON = (
    ('I', 'INSERT'),
    ('U', 'UPDATE'),
    ('D', 'DELETE'),
    ('S', 'SELECT'),
    ('A', 'ANY QUERY'),
    ('L', 'ANY DDL'),
    )

    table_name = models.CharField(max_length=100)
    listening_to = models.CharField(max_length=1, choices=LISTENING_ON)

    class Meta:
        unique_together = (("table_name", "listening_to"),)

    def __unicode__(self):
        return '%s - %s' % (self.table_name, self.listening_to)

class Action(models.Model):
    ACTION_TYPE_CHOICES = (
    ('P', 'Python Script'   ),
    ('C', 'Class name'      ),
    )
    name = models.CharField(max_length=100)
    action_type = models.CharField(max_length=1, choices=ACTION_TYPE_CHOICES)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

class Steps(models.Model):
    sequence = models.IntegerField() 
    Action = models.ForeignKey(Action)
    Signal_Listener = models.ForeignKey(Signal_Listener)

class Process(models.Model):
## TODO: Document
# Processes class is used to store information about the process itself.
# Or in another word, the workflow name.
    WF_TYPE_LIST = (
    ('P', 'Python-API'),
    )

    name = models.CharField(max_length=30)
    is_active = models.BooleanField()
    wf_type = models.CharField(max_length=1, choices=WF_TYPE_LIST)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)
    listening_to = models.ForeignKey(Steps)



################################
#   Workflow transactions models
################################

class Instance(models.Model):
##  TODO: Document
# Re
    INSTANCE_STATUS = (
    ('I', 'In Progress' ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'     ), # Old completed tasks can be archived
    ('P', 'Pending'     ),
    ('O', 'Completed'   )
    )

    id = models.CharField(max_length=200, primary_key=True)
    status = models.CharField(max_length=1, choices=INSTANCE_STATUS, db_index=True)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

    def save(self, *args, **kwargs):
    # on new records generate a new uuid
        if self.id is None or self.id.__len__() is 0:
            import uuid

            self.id = uuid.uuid4().__str__()
        super(Instances, self).save(*args, **kwargs)

class Task(models.Model):
    TASK_STATUS = (
    ('S', 'Assigned'    ),
    ('I', 'In Progress' ),
    ('P', 'Pending'     ),
    ('C', 'Cancelled'   ),
    ('A', 'Archived'    ), # Old completed tasks can be archived
    ('O', 'Completed'   )
    )
    name = models.CharField(max_length=100)
    instance = models.ForeignKey(Instance)
    status = models.CharField(max_length=1, choices=TASK_STATUS)
    bio = models.CharField(max_length=100)
    audit_obj = generic.GenericRelation('core.Audit', editable=False)

and I also have a workflow/signals.py

    """
        Workflow Signals
            Will be used to monitor all inserts, update, delete or select statements
            If an action is attached to that particular table, it will be inserted Celery-Tasks distribution.
    """
    from django.db.models.signals import post_save, post_delete
    from django.core.cache import cache


    def workflow_post_init_listener(sender, **kwargs):
        try:

            if cache.get('wf_listner_cache_%s' % kwargs['instance']._meta.db_table):
                pass
            else:
                record = 'Signal_Listener'.objects.get(table_name__exact=kwargs['instance']._meta.db_table)
# am not sure what to do next!
        except 'Signal_Listener'.DoesNotExist:
            # TODO: Error logging
            pass

    post_save.connect(workflow_post_init_listener, dispatch_uid="workflow.models.listener.save")

I think my model design might need to be enhanced as well. I could use in several scenarios and I was thinking to start with the approval cycle, for example, I could insert the table_name/model_name in signal_listener to monitor for new inserts. If so, I will trigger a particular workflow.

As for actions, I do understand that actions will require to be developed. Maybe i will need to create an action folder under workflow app, put each action in a class. Each will do a particular task of sending emails, archiving, update database value..etc

If am reinventing the wheel and if there is such thing that has been developed already, any one can suggest, I would be more than happy to go through it.

Best regards,

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

与他有关 2024-10-18 00:25:00

例如,您可以查看 zope.wfmc (http://pypi.python.org/pypi/zope.wfmc):这是工作流管理联盟的实现
可以在 XPDL 上定义的工作流程。

For example you can have a look for zope.wfmc (http://pypi.python.org/pypi/zope.wfmc) : this is an implementation of a Workflow-Management Coalition
Workflow that can be defined on XPDL.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文