EntityManager在执行不同线程的查询时关闭
我正在尝试在不同的线程上执行一些查询。在运行时,每个表上都有2个顶级查询。为了执行第一组查询(executeQuery1()
),我催生了2个不同的线程,并且对它们进行了很好的处理。从这些查询的输出中,我必须提取ID列表,然后在完全不同的线程上启动另一组查询(executeQuery2()
)。一旦将第二组查询提交到数据库,我就会看到EntityManager已关闭并关闭应用程序。
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'springApplicationAdminRegistrar'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'mbeanExporter'
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'defaultValidator'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'org.springframework.data.jpa.util.JpaMetamodelCacheCleanup'
2022-04-28 22:34:29.530 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'threadPoolTaskExecutor'
2022-04-28 22:34:29.530 DEBUG 48403 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'threadPoolTaskExecutor'
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaEntityManagerFactory': [verticaTransactionManager]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaTransactionManager': [transactionTemplate]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'verticaEntityManagerFactory'
2022-04-28 22:34:29.531 INFO 48403 --- [main] c.b.a.n.h.c.VerticaDataSourceConfig$1 : Closing JPA EntityManagerFactory for persistence unit 'vertica'
2022-04-28 22:34:29.531 DEBUG 48403 --- [main] o.hibernate.internal.SessionFactoryImpl : HHH000031: Closing
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.engine.query.spi.QueryPlanCache : Cleaning QueryPlan Cache
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@77774571] for TypeConfiguration
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@44af588b] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@77774571]
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.s.i.AbstractServiceRegistryImpl : Implicitly destroying ServiceRegistry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.b.r.i.BootstrapServiceRegistryImpl : Implicitly destroying Boot-strap registry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 TRACE 48403 --- [MyAsyncThread-4] j.i.AbstractLogicalConnectionImplementor : Preparing to begin transaction via JDBC Connection.setAutoCommit(false)
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaDataSource': [dataSourceScriptDatabaseInitializer, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration, jdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'dataSourceScriptDatabaseInitializer': [jdbcTemplate, namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jdbcTemplate': [namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration': [jpaVendorAdapter, entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jpaVendorAdapter': [entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking close() on bean with name 'verticaDataSource'
2022-04-28 22:34:29.533 INFO 48403 --- [main] com.zaxxer.hikari.HikariDataSource : vertica-db-pool - Shutdown initiated...
2022-04-28 22:34:29.533 DEBUG 48403 --- [main] com.zaxxer.hikari.pool.HikariPool : vertica-db-pool - Before shutdown stats (total=20, active=2, idle=18, waiting=0)
我的春季启动应用程序中有2个数据源,因此必须以编程方式配置数据源。
asyncconfiguration.java
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("MyAsyncThread-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
asyncservice.java
@Slf4j
@Service
public class AsyncService {
@Autowired VerticaRepository verticaRepo;
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity1>> execute1(String query) {
List<Entity1> result = verticaRepo.executeQuery1(query);
return CompletableFuture.completedFuture(result);
}
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity2>> execute2(List<BigInteger> ids, String query) {
List<Entity2> result = verticaRepo.executeQuery2(ids, query);
return CompletableFuture.completedFuture(result);
}
}
verticadatasourceconfig.java
@Configuration
@ConfigurationProperties("vertica.datasource")
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "verticaEntityManagerFactory",
transactionManagerRef = "verticaTransactionManager",
basePackages = { "mypackage.repository" }
)
public class VerticaDataSourceConfig /*extends HikariConfig*/ {
public final static String PERSISTENCE_UNIT_NAME = "vertica";
public final static String PACKAGES_TO_SCAN = "mypackage.entity";
@Autowired
private Environment env;
@Bean
public HikariDataSource verticaDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(env.getProperty("vertica.datasource.jdbc-url"));
hikariConfig.setUsername(env.getProperty("vertica.datasource.username"));
hikariConfig.setPassword(env.getProperty("vertica.datasource.password"));
hikariConfig.setDriverClassName(env.getProperty("vertica.datasource.driver-class-name"));
hikariConfig.setConnectionTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.connectionTimeout")));
hikariConfig.setIdleTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.idleTimeout")));
hikariConfig.setMaxLifetime(Long.parseLong(env.getProperty("vertica.datasource.hikari.maxLifetime")));
hikariConfig.setKeepaliveTime(Long.parseLong(env.getProperty("vertica.datasource.hikari.keepaliveTime")));
hikariConfig.setMaximumPoolSize(Integer.parseInt(env.getProperty("vertica.datasource.hikari.maximumPoolSize")));
hikariConfig.setPoolName(env.getProperty("vertica.datasource.hikari.poolName"));
hikariConfig.setValidationTimeout(Integer.parseInt(env.getProperty("vertica.datasource.hikari.validationTimeout")));
return new HikariDataSource(hikariConfig);
}
@Bean
public LocalContainerEntityManagerFactoryBean verticaEntityManagerFactory(
final HikariDataSource verticaDataSource) {
return new LocalContainerEntityManagerFactoryBean() {{
setDataSource(verticaDataSource);
setPersistenceProviderClass(HibernatePersistenceProvider.class);
setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
setPackagesToScan(PACKAGES_TO_SCAN);
Properties jpaProperties = new Properties();
jpaProperties.put("hibernate.ddl-auto", env.getProperty("vertica.jpa.hibernate.ddl-auto"));
jpaProperties.put("hibernate.show-sql", env.getProperty("vertica.jpa.hibernate.show-sql"));
jpaProperties.put("hibernate.format_sql", env.getProperty("vertica.jpa.hibernate.format_sql"));
jpaProperties.put("hibernate.dialect", env.getProperty("vertica.jpa.properties.hibernate.dialect"));
setJpaProperties(jpaProperties);
afterPropertiesSet();;
}};
}
@Bean
public PlatformTransactionManager verticaTransactionManager(EntityManagerFactory verticaEntityManagerFactory) {
return new JpaTransactionManager(verticaEntityManagerFactory);
}
}
verticarepository.java
@Repository
public class VerticaRepository {
//@Autowired
@PersistenceContext(unitName = "vertica")
private EntityManager em;
@Transactional
public List<Entity1> executeQuery1(String queryStr) {
// query.setParameter() can only replace parameters in WHERE clause of a query;
// it cannot replace table or column names
String replacedQuery = // replace table name and column name
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity1> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
@Transactional
public List<Entity2> executeQuery2(List<BigInteger> ids, String queryStr) {
String replacedQuery = // replace table name and column name; the table and col names are different from the ones in executeQWuery1()
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity2> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
}
businessService.java
@Slf4j
@Component("businessService")
public class BusinessService {
@Autowired
private String query1;
@Autowired
private String query2;
@Autowired private AsyncService asyncService;
public Void serve() throws Exception {
List<CompletableFuture<List<Entity1>>> violationFutures = new ArrayList<>();
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
violationFutures.add(asyncService.execute1(query1));
}
CompletableFuture<List<List<Entity1>>> vcf = sequence(violationFutures);
List<Entity1> aggregatedViolations = new ArrayList<>();
for (List<Entity1> list: vcf.get()) {
aggregatedViolations.addAll(list);
}
int numProcessors = Runtime.getRuntime().availableProcessors();
List<BigInteger> idList= //somehow get a list of ids from aggregatedViolations
List<List<BigInteger>> partitionedList = ListUtils.partition(idList, numProcessors);
List<CompletableFuture<List<Entity2>>> trendFutures = new ArrayList<>();
for (List<BigInteger> ids: partitionedList) {
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
trendFutures.add(asyncService.execute2(getIds(devices), query2));
}
}
CompletableFuture<List<List<Entity2>>> tcf = sequence(trendFutures);
// rest of the business logic is dependent on the above queries execution
return null;
}
private static <T> CompletableFuture<List<List<T>>> sequence(List<CompletableFuture<List<T>>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.toList())
);
}
我相信这与在多线程中使用的EntityManager有关。但是,当我阅读文档时,@transactional每次都会提供新的EM。如果executeQuery1()
能够在不同线程上并行运行2个查询,为什么executeQuery2()
关闭EM?
I am trying to execute a couple of queries on different threads. There are 2 top level queries each executing on different tables at runtime. For executing the first set of queries (executeQuery1()
), I spawn 2 different threads and they are processed well. From the output of these queries, I have to extract a list of ids and then fire another set of queries (executeQuery2()
) on entirely different threads. As soon as the second set of queries are about to be submitted to the database, I see that EntityManager is closed and the application shutdown.
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'springApplicationAdminRegistrar'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'mbeanExporter'
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'defaultValidator'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'org.springframework.data.jpa.util.JpaMetamodelCacheCleanup'
2022-04-28 22:34:29.530 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'threadPoolTaskExecutor'
2022-04-28 22:34:29.530 DEBUG 48403 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'threadPoolTaskExecutor'
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaEntityManagerFactory': [verticaTransactionManager]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaTransactionManager': [transactionTemplate]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'verticaEntityManagerFactory'
2022-04-28 22:34:29.531 INFO 48403 --- [main] c.b.a.n.h.c.VerticaDataSourceConfig$1 : Closing JPA EntityManagerFactory for persistence unit 'vertica'
2022-04-28 22:34:29.531 DEBUG 48403 --- [main] o.hibernate.internal.SessionFactoryImpl : HHH000031: Closing
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.engine.query.spi.QueryPlanCache : Cleaning QueryPlan Cache
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@77774571] for TypeConfiguration
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@44af588b] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@77774571]
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.s.i.AbstractServiceRegistryImpl : Implicitly destroying ServiceRegistry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.b.r.i.BootstrapServiceRegistryImpl : Implicitly destroying Boot-strap registry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 TRACE 48403 --- [MyAsyncThread-4] j.i.AbstractLogicalConnectionImplementor : Preparing to begin transaction via JDBC Connection.setAutoCommit(false)
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaDataSource': [dataSourceScriptDatabaseInitializer, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration, jdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'dataSourceScriptDatabaseInitializer': [jdbcTemplate, namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jdbcTemplate': [namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration': [jpaVendorAdapter, entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jpaVendorAdapter': [entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking close() on bean with name 'verticaDataSource'
2022-04-28 22:34:29.533 INFO 48403 --- [main] com.zaxxer.hikari.HikariDataSource : vertica-db-pool - Shutdown initiated...
2022-04-28 22:34:29.533 DEBUG 48403 --- [main] com.zaxxer.hikari.pool.HikariPool : vertica-db-pool - Before shutdown stats (total=20, active=2, idle=18, waiting=0)
I have 2 datasources in my Spring Boot app and therefore have to configure datasources programmatically.
AsyncConfiguration.java
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("MyAsyncThread-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
AsyncService.java
@Slf4j
@Service
public class AsyncService {
@Autowired VerticaRepository verticaRepo;
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity1>> execute1(String query) {
List<Entity1> result = verticaRepo.executeQuery1(query);
return CompletableFuture.completedFuture(result);
}
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity2>> execute2(List<BigInteger> ids, String query) {
List<Entity2> result = verticaRepo.executeQuery2(ids, query);
return CompletableFuture.completedFuture(result);
}
}
VerticaDataSourceConfig.java
@Configuration
@ConfigurationProperties("vertica.datasource")
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "verticaEntityManagerFactory",
transactionManagerRef = "verticaTransactionManager",
basePackages = { "mypackage.repository" }
)
public class VerticaDataSourceConfig /*extends HikariConfig*/ {
public final static String PERSISTENCE_UNIT_NAME = "vertica";
public final static String PACKAGES_TO_SCAN = "mypackage.entity";
@Autowired
private Environment env;
@Bean
public HikariDataSource verticaDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(env.getProperty("vertica.datasource.jdbc-url"));
hikariConfig.setUsername(env.getProperty("vertica.datasource.username"));
hikariConfig.setPassword(env.getProperty("vertica.datasource.password"));
hikariConfig.setDriverClassName(env.getProperty("vertica.datasource.driver-class-name"));
hikariConfig.setConnectionTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.connectionTimeout")));
hikariConfig.setIdleTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.idleTimeout")));
hikariConfig.setMaxLifetime(Long.parseLong(env.getProperty("vertica.datasource.hikari.maxLifetime")));
hikariConfig.setKeepaliveTime(Long.parseLong(env.getProperty("vertica.datasource.hikari.keepaliveTime")));
hikariConfig.setMaximumPoolSize(Integer.parseInt(env.getProperty("vertica.datasource.hikari.maximumPoolSize")));
hikariConfig.setPoolName(env.getProperty("vertica.datasource.hikari.poolName"));
hikariConfig.setValidationTimeout(Integer.parseInt(env.getProperty("vertica.datasource.hikari.validationTimeout")));
return new HikariDataSource(hikariConfig);
}
@Bean
public LocalContainerEntityManagerFactoryBean verticaEntityManagerFactory(
final HikariDataSource verticaDataSource) {
return new LocalContainerEntityManagerFactoryBean() {{
setDataSource(verticaDataSource);
setPersistenceProviderClass(HibernatePersistenceProvider.class);
setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
setPackagesToScan(PACKAGES_TO_SCAN);
Properties jpaProperties = new Properties();
jpaProperties.put("hibernate.ddl-auto", env.getProperty("vertica.jpa.hibernate.ddl-auto"));
jpaProperties.put("hibernate.show-sql", env.getProperty("vertica.jpa.hibernate.show-sql"));
jpaProperties.put("hibernate.format_sql", env.getProperty("vertica.jpa.hibernate.format_sql"));
jpaProperties.put("hibernate.dialect", env.getProperty("vertica.jpa.properties.hibernate.dialect"));
setJpaProperties(jpaProperties);
afterPropertiesSet();;
}};
}
@Bean
public PlatformTransactionManager verticaTransactionManager(EntityManagerFactory verticaEntityManagerFactory) {
return new JpaTransactionManager(verticaEntityManagerFactory);
}
}
VerticaRepository.java
@Repository
public class VerticaRepository {
//@Autowired
@PersistenceContext(unitName = "vertica")
private EntityManager em;
@Transactional
public List<Entity1> executeQuery1(String queryStr) {
// query.setParameter() can only replace parameters in WHERE clause of a query;
// it cannot replace table or column names
String replacedQuery = // replace table name and column name
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity1> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
@Transactional
public List<Entity2> executeQuery2(List<BigInteger> ids, String queryStr) {
String replacedQuery = // replace table name and column name; the table and col names are different from the ones in executeQWuery1()
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity2> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
}
BusinessService.java
@Slf4j
@Component("businessService")
public class BusinessService {
@Autowired
private String query1;
@Autowired
private String query2;
@Autowired private AsyncService asyncService;
public Void serve() throws Exception {
List<CompletableFuture<List<Entity1>>> violationFutures = new ArrayList<>();
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
violationFutures.add(asyncService.execute1(query1));
}
CompletableFuture<List<List<Entity1>>> vcf = sequence(violationFutures);
List<Entity1> aggregatedViolations = new ArrayList<>();
for (List<Entity1> list: vcf.get()) {
aggregatedViolations.addAll(list);
}
int numProcessors = Runtime.getRuntime().availableProcessors();
List<BigInteger> idList= //somehow get a list of ids from aggregatedViolations
List<List<BigInteger>> partitionedList = ListUtils.partition(idList, numProcessors);
List<CompletableFuture<List<Entity2>>> trendFutures = new ArrayList<>();
for (List<BigInteger> ids: partitionedList) {
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
trendFutures.add(asyncService.execute2(getIds(devices), query2));
}
}
CompletableFuture<List<List<Entity2>>> tcf = sequence(trendFutures);
// rest of the business logic is dependent on the above queries execution
return null;
}
private static <T> CompletableFuture<List<List<T>>> sequence(List<CompletableFuture<List<T>>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.toList())
);
}
I believe that this has something to do with the EntityManagers being used in multi-threaded env. However, when I read the documentation, @Transactional will supply a new EM everytime. If the executeQuery1()
was able to run 2 queries in parallel on different threads, why is executeQuery2()
closing the EM?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论