- 1. 简介
- 2. 开始
- 3. 配置
- 4. Flowable API
- 5. 集成 Spring
- 6. 部署
- 7. BPMN 2.0 介绍
- 8. BPMN 2.0 结构
- 9. 表单
- 10. JPA
- 11. 历史
- 12. 身份管理
- 13. Eclipse Designer
- 14. Flowable UI 应用
- 15. REST API
- 16. 集成 CDI
- 17. 集成 LDAP
- 18. 高级
- 19. 工具
18.1. 异步执行器
Flowable V5版本中,在之前的作业执行器(job executor)之外,还提供了异步执行器(async executor)。异步执行器已被许多Flowable的用户及我们自己的跑分证明,性能比老的作业执行器好。
从Flowable V6起,将只提供异步执行器。在V6中,对异步执行器进行了完全的重构,以提升性能及易用性。当然仍然与已有的API兼容。
18.1.1. 异步执行器的设计
Flowable有两种作业类型:定时器(例如边界事件或用户任务中的定时器)以及异步操作(带有flowable:async="true"属性的服务任务)。
定时器很容易理解:保存在ACT_RU_TIMER_JOB表中,并带有给定的到期日期。异步执行器中有一个线程,周期性地检查是否有需要触发的定时器(也就是说,到期日期在当前时间“之前”)。当需要触发定时器时,从JOB表中移除该定时器,并创建一个异步作业(async job)。
异步作业在执行流程实例(即调用API)时插入数据库。如果当前Flowable引擎启用了异步执行器,则该异步作业将被锁定(locked)。即在ACT_RU_JOB表中插入一个作业条目,并设置其lock owner(锁持有人)与lock expiration time(锁到期时间)。在API调用成功后触发的事务监听器(transaction commit listener),将会触发同一引擎中的异步执行器,让其执行该作业(因此可以保证数据库中已经保存了数据)。为此,异步执行器使用(可配置的)线程池,从其中取出线程用于执行作业,并使流程可以异步进行。如果Flowable引擎未启用异步执行器,则异步作业仍会插入ACT_RU_JOB表,但不会被锁定。
与检查定时器的线程类似,异步执行器中也有一个用于“获取”新的异步作业的线程。这里的异步作业,指的是表中存储但未被锁定的作业。这个线程会将这些作业锁定给当前Flowable引擎,并发送至异步执行器。
用于执行作业的线程池从一个内存队列中获取作业。当队列满了时(可配置),作业将会被解锁,并重新存回数据库。这样,其他的异步执行器就可以重新操作它们。
如果在执行作业期间发生了异常,这个异步作业将会转化为一个定时器作业,并带有一个到期日期。之后,它将会像普通定时器作业一样被获取,并重新变回异步作业,以实现重试。当一个作业已经重试了(可配置)几次,仍然失败,则作业被视为“死亡(dead)”,并被移至ACT_RU_DEADLETTER_JOB表。“死信(deadletter)”的概念在各种其他系统中也广泛使用。管理员需要检查失败作业的异常信息,并进行相应操作。
流程定义与流程实例都可以被暂停。这些定义或实例所关联的暂停作业,将被移至ACT_RU_SUSPENDED_JOB表,以确保用于获取作业的查询语句中的where条件尽量少。
综上所述:对于熟悉作业/异步执行器的旧实现的人来说,主要目标是让查询尽可能简单。在过去(V6以前),所有作业类型/状态使用一张表。为了满足所有使用场景,导致“where”条件十分庞大。现在已经解决了这个问题,我们的跑分也证明这个新设计带来了更好的性能,也更有弹性。
18.1.2. 配置异步执行器
异步执行器是一个高度可配置的组件。建议先查看异步执行器的默认配置,检查它们是否符合你的流程的要求。
另外,也可以扩展默认的实现,或者替换为你自己实现的org.flowable.engine.impl.asyncexecutor.AsyncExecutor接口。
可以在流程引擎配置中使用setter设置下列参数:
名称 | 默认值 | 描述 |
---|---|---|
asyncExecutorThreadPoolQueueSize | 100 | 在获取待执行的作业之后,线程池中某个线程实际执行作业之前,放置这些作业的队列的长度。 |
asyncExecutorCorePoolSize | 2 | 用于执行作业的线程池中,最小的活动(kept alive)线程数量。 |
asyncExecutorMaxPoolSize | 10 | 用于执行作业的线程池中,创建线程的最大数量。 |
asyncExecutorThreadKeepAliveTime | 5000 | 在销毁执行作业所用的线程前,需要保持活动的时间(以毫秒计)。设置为>0的值会消耗资源,但在有大量执行作业的时候,可以避免总是创建新线程。如果设置为0,则线程会在执行完作业后立刻被销毁。 |
asyncExecutorNumberOfRetries | 3 | 作业被移入“死信”表之前的最大重试次数。 |
asyncExecutorMaxTimerJobsPerAcquisition | 1 | 在一次获取定时器作业的查询中,获取作业的数量。默认值为1,因为这样可以降低潜在的乐观锁异常情况。较大的数值性能较好,但在不同的引擎间发生乐观锁异常的几率也会变大。 |
asyncExecutorMaxAsyncJobsDuePerAcquisition | 1 | 在一次获取异步作业的查询中,获取作业的数量。默认值为1,因为这样可以降低潜在的乐观锁异常情况。较大的数值性能较好,但在不同的引擎间发生乐观锁异常的几率也会变大。 |
asyncExecutorDefaultTimerJobAcquireWaitTime | 10000 | 获取定时器作业的线程在两次获取作业的查询之间等待的时间(以毫秒记)。只在上一次查询未找到新的定时器作业,或者获取的作业数量少于asyncExecutorMaxTimerJobsPerAcquisition中设置的值时才会等待。 |
asyncExecutorDefaultAsyncJobAcquireWaitTime | 10000 | 获取异步作业的线程在两次获取作业的查询之间等待的时间(以毫秒记)。只在上一次查询未找到新的异步作业,或者获取的作业数量少于asyncExecutorMaxAsyncJobsDuePerAcquisition中设置的值时才会等待。 |
asyncExecutorDefaultQueueSizeFullWaitTime | 0 | 在内部作业队列已满之后,执行下一次查询之前,获取作业(包括定时器作业及异步作业)的线程将等待的时间(以毫秒记)。默认值为0(为保证向后兼容性)。设置为较大的值,可以让异步执行器有机会多执行掉一些队列作业,腾出队列空间。 |
asyncExecutorTimerLockTimeInMillis | 5分钟 | 异步执行器获取定时器作业之后锁定的时间(以毫秒记)。在这段时间内,其它异步执行器不会尝试获取或锁定这个作业。 |
asyncExecutorAsyncJobLockTimeInMillis | 5分钟 | 异步执行器获取异步作业之后锁定的时间(以毫秒记)。在这段时间内,其它异步执行器不会尝试获取或锁定这个作业。 |
asyncExecutorSecondsToWaitOnShutdown | 60 | 当请求关闭执行器(或流程引擎)后,等待执行作业的线程池安全关闭的时间(以秒记)。 |
asyncExecutorResetExpiredJobsInterval | 60秒 | 在两次超时作业(expired job)检查之间等待的时间(以毫秒记)。超时作业指的是已经锁定(某个执行器已经为其写入了锁持有人以及超时时间),但一直没有完成的作业。在检查中,会解锁超时作业,即移除其锁持有人以及超时时间。这样其他执行器就可以重新获取它。作业的锁超时时间在当前时间之前则视作超时。 |
asyncExecutorResetExpiredJobsPageSize | 3 | 异步执行器的超时重置(reset expired)检查线程一次获取的作业数量。 |
18.1.3. 基于消息队列的异步执行器
阅读异步执行器的设计章节之后,很明显架构的灵感来自消息队列。异步执行器设计思路保证了可以很轻松地用消息队列代替线程池的工作,处理异步作业。
跑分显示,相比基于线程池的异步执行器,消息队列性能出众,吞吐量大。但需要额外的中间件,当然也就增加了安装配置、维护及监控的复杂度。对于多数用户来说,基于线程池的异步执行器性能已经足够用了。但能够知道在性能要求增长之后,仍有改进方案,也是挺好的。
目前,唯一直接可用的是带有JMS的Spring。选择首先支持Spring的原因是,Spring提供了非常好的功能,解决了使用线程以及处理多个消息消费者造成的麻烦。但是其实集成也很简单,因此可以轻松改用任何其他消息队列实现或协议(Stomp、AMPQ等等)。我们欢迎用户反馈下一个应该支持什么消息队列。
使用消息队列后,当引擎创建新的异步作业时,会在消息队列中放入一条包含有作业标识的消息(处在一个事务提交监听器中,这样就可以确保该作业条目已经提交至数据库)。之后消息消费者可以获取作业标识,并获取及执行该作业。异步执行器不再创建线程池,而是会在另一个单独线程中插入及查询定时器。当定时器到时触发时,将会被移至异步作业表,同时向消息队列发送一条消息。消息队列也可能失败,所以超时重置线程会按照原逻辑处理。只不过不是解锁作业,而是重发消息。异步执行器不再轮询异步作业。
主要由两个类实现:
org.flowable.engine.impl.asyncexecutor.JobManager接口的实现,将消息发送至消息队列而不是线程池。
javax.jms.MessageListener接口的实现。从消息队列中消费消息,并使用消息中的作业标识获取及执行该作业。
首先添加flowable-jms-spring-executor依赖:
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-jms-spring-executor</artifactId>
<version>${flowable.version}</version>
</dependency>
在流程引擎配置中进行如下设置启用基于消息队列的异步执行器:
asyncExecutorActivate为true
asyncExecutorMessageQueueMode为true
org.flowable.spring.executor.jms.MessageBasedJobManager注入为JobManager
下面是一个基于Java配置的完整例子,使用ActiveMQ作为消息中间件。
请注意:
需要为MessageBasedJobManager注入一个配置了正确的connectionFactory的JMSTemplate。
我们使用Spring的MessageListenerContainer,因为它大幅简化了线程与多消费者的使用。
@Configuration
public class SpringJmsConfig {
@Bean
public DataSource dataSource() {
// 略
}
@Bean(name = "transactionManager")
public PlatformTransactionManager transactionManager() {
DataSourceTransactionManager transactionManager = new DataSourceTransactionManager();
transactionManager.setDataSource(dataSource());
return transactionManager;
}
@Bean
public SpringProcessEngineConfiguration processEngineConfiguration() {
SpringProcessEngineConfiguration configuration = new SpringProcessEngineConfiguration();
configuration.setDataSource(dataSource());
configuration.setTransactionManager(transactionManager());
configuration.setDatabaseSchemaUpdate(SpringProcessEngineConfiguration.DB_SCHEMA_UPDATE_TRUE);
configuration.setAsyncExecutorMessageQueueMode(true);
configuration.setAsyncExecutorActivate(true);
configuration.setJobManager(jobManager());
return configuration;
}
@Bean
public ProcessEngine processEngine() {
return processEngineConfiguration().buildProcessEngine();
}
@Bean
public MessageBasedJobManager jobManager() {
MessageBasedJobManager jobManager = new MessageBasedJobManager();
jobManager.setJmsTemplate(jmsTemplate());
return jobManager;
}
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
activeMQConnectionFactory.setUseAsyncSend(true);
activeMQConnectionFactory.setAlwaysSessionAsync(true);
return new CachingConnectionFactory(activeMQConnectionFactory);
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDefaultDestination(new ActiveMQQueue("flowable-jobs"));
jmsTemplate.setConnectionFactory(connectionFactory());
return jmsTemplate;
}
@Bean
public MessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer messageListenerContainer = new DefaultMessageListenerContainer();
messageListenerContainer.setConnectionFactory(connectionFactory());
messageListenerContainer.setDestinationName("flowable-jobs");
messageListenerContainer.setMessageListener(jobMessageListener());
messageListenerContainer.setConcurrentConsumers(2);
messageListenerContainer.start();
return messageListenerContainer;
}
@Bean
public JobMessageListener jobMessageListener() {
JobMessageListener jobMessageListener = new JobMessageListener();
jobMessageListener.setProcessEngineConfiguration(processEngineConfiguration());
return jobMessageListener;
}
}
在上面的代码中,flowable-jms-spring-executor模块提供的只有JobMessageListener与MessageBasedJobManager两个类。其他的所有代码都来自Spring。因此,如果想要替换为其他的队列/协议,就需要替换这些类。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论