EntityManager在执行不同线程的查询时关闭

发布于 2025-01-25 07:55:27 字数 13380 浏览 2 评论 0原文

我正在尝试在不同的线程上执行一些查询。在运行时,每个表上都有2个顶级查询。为了执行第一组查询(executeQuery1()),我催生了2个不同的线程,并且对它们进行了很好的处理。从这些查询的输出中,我必须提取ID列表,然后在完全不同的线程上启动另一组查询(executeQuery2())。一旦将第二组查询提交到数据库,我就会看到EntityManager已关闭并关闭应用程序。

2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'springApplicationAdminRegistrar'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'mbeanExporter'
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'defaultValidator'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'org.springframework.data.jpa.util.JpaMetamodelCacheCleanup'
2022-04-28 22:34:29.530 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'threadPoolTaskExecutor'
2022-04-28 22:34:29.530 DEBUG 48403 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'threadPoolTaskExecutor'
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaEntityManagerFactory': [verticaTransactionManager]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaTransactionManager': [transactionTemplate]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'verticaEntityManagerFactory'
2022-04-28 22:34:29.531  INFO 48403 --- [main] c.b.a.n.h.c.VerticaDataSourceConfig$1    : Closing JPA EntityManagerFactory for persistence unit 'vertica'
2022-04-28 22:34:29.531 DEBUG 48403 --- [main] o.hibernate.internal.SessionFactoryImpl  : HHH000031: Closing
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.engine.query.spi.QueryPlanCache      : Cleaning QueryPlan Cache
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope     : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@77774571] for TypeConfiguration
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope     : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@44af588b] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@77774571]
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.s.i.AbstractServiceRegistryImpl      : Implicitly destroying ServiceRegistry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.b.r.i.BootstrapServiceRegistryImpl   : Implicitly destroying Boot-strap registry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 TRACE 48403 --- [MyAsyncThread-4] j.i.AbstractLogicalConnectionImplementor : Preparing to begin transaction via JDBC Connection.setAutoCommit(false)
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaDataSource': [dataSourceScriptDatabaseInitializer, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration, jdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'dataSourceScriptDatabaseInitializer': [jdbcTemplate, namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'jdbcTemplate': [namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration': [jpaVendorAdapter, entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'jpaVendorAdapter': [entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking close() on bean with name 'verticaDataSource'
2022-04-28 22:34:29.533  INFO 48403 --- [main] com.zaxxer.hikari.HikariDataSource       : vertica-db-pool - Shutdown initiated...
2022-04-28 22:34:29.533 DEBUG 48403 --- [main] com.zaxxer.hikari.pool.HikariPool        : vertica-db-pool - Before shutdown stats (total=20, active=2, idle=18, waiting=0)

我的春季启动应用程序中有2个数据源,因此必须以编程方式配置数据源。

asyncconfiguration.java

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("MyAsyncThread-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return new ThreadPoolTaskExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

asyncservice.java

@Slf4j
@Service
public class AsyncService {

    @Autowired VerticaRepository verticaRepo;

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<Entity1>> execute1(String query) {
        List<Entity1> result = verticaRepo.executeQuery1(query);
        return CompletableFuture.completedFuture(result);
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<Entity2>> execute2(List<BigInteger> ids, String query) {
        List<Entity2> result = verticaRepo.executeQuery2(ids, query);
        return CompletableFuture.completedFuture(result);
    }
}

verticadatasourceconfig.java

@Configuration
@ConfigurationProperties("vertica.datasource")
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef = "verticaEntityManagerFactory",
        transactionManagerRef = "verticaTransactionManager",
        basePackages = { "mypackage.repository" }
)
public class VerticaDataSourceConfig /*extends HikariConfig*/ {
    public final static String PERSISTENCE_UNIT_NAME = "vertica";
    public final static String PACKAGES_TO_SCAN = "mypackage.entity";

    @Autowired
    private Environment env;

    @Bean
    public HikariDataSource verticaDataSource() {
        HikariConfig hikariConfig = new HikariConfig();

        hikariConfig.setJdbcUrl(env.getProperty("vertica.datasource.jdbc-url"));
        hikariConfig.setUsername(env.getProperty("vertica.datasource.username"));
        hikariConfig.setPassword(env.getProperty("vertica.datasource.password"));
        hikariConfig.setDriverClassName(env.getProperty("vertica.datasource.driver-class-name"));

        hikariConfig.setConnectionTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.connectionTimeout")));
        hikariConfig.setIdleTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.idleTimeout")));
        hikariConfig.setMaxLifetime(Long.parseLong(env.getProperty("vertica.datasource.hikari.maxLifetime")));
        hikariConfig.setKeepaliveTime(Long.parseLong(env.getProperty("vertica.datasource.hikari.keepaliveTime")));
        hikariConfig.setMaximumPoolSize(Integer.parseInt(env.getProperty("vertica.datasource.hikari.maximumPoolSize")));
        hikariConfig.setPoolName(env.getProperty("vertica.datasource.hikari.poolName"));
        hikariConfig.setValidationTimeout(Integer.parseInt(env.getProperty("vertica.datasource.hikari.validationTimeout")));

        return new HikariDataSource(hikariConfig);
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean verticaEntityManagerFactory(
            final HikariDataSource verticaDataSource) {

        return new LocalContainerEntityManagerFactoryBean() {{
            setDataSource(verticaDataSource);
            setPersistenceProviderClass(HibernatePersistenceProvider.class);
            setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
            setPackagesToScan(PACKAGES_TO_SCAN);

            Properties jpaProperties = new Properties();
            jpaProperties.put("hibernate.ddl-auto", env.getProperty("vertica.jpa.hibernate.ddl-auto"));
            jpaProperties.put("hibernate.show-sql", env.getProperty("vertica.jpa.hibernate.show-sql"));
            jpaProperties.put("hibernate.format_sql", env.getProperty("vertica.jpa.hibernate.format_sql"));
            jpaProperties.put("hibernate.dialect", env.getProperty("vertica.jpa.properties.hibernate.dialect"));

            setJpaProperties(jpaProperties);

            afterPropertiesSet();;
        }};
    }

    @Bean
    public PlatformTransactionManager verticaTransactionManager(EntityManagerFactory verticaEntityManagerFactory) {
        return new JpaTransactionManager(verticaEntityManagerFactory);
    }
}

verticarepository.java

@Repository
public class VerticaRepository {
    //@Autowired
    @PersistenceContext(unitName = "vertica")
    private EntityManager em;

    @Transactional
    public List<Entity1> executeQuery1(String queryStr) {
        // query.setParameter() can only replace parameters in WHERE clause of a query;
        // it cannot replace table or column names
        String replacedQuery = // replace table name and column name
        Query query = em.createNativeQuery(replacedQuery);
        

        List<Object[]> result = query.getResultList();

        List<Entity1> entities = new ArrayList<>();

        // fill entities list with result

        return entities;
    }

    @Transactional
    public List<Entity2> executeQuery2(List<BigInteger> ids, String queryStr) {

        String replacedQuery = // replace table name and column name; the table and col names are different from the ones in executeQWuery1()
        Query query = em.createNativeQuery(replacedQuery);
        

        List<Object[]> result = query.getResultList();

        List<Entity2> entities = new ArrayList<>();

        // fill entities list with result

        return entities;
    }
}

businessService.java

@Slf4j
@Component("businessService")
public class BusinessService {

    @Autowired
    private String query1;

    @Autowired
    private String query2;

    @Autowired private AsyncService asyncService;

    public Void serve() throws Exception {
        
        List<CompletableFuture<List<Entity1>>> violationFutures = new ArrayList<>();
        
        for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
            violationFutures.add(asyncService.execute1(query1));
        }

        CompletableFuture<List<List<Entity1>>> vcf = sequence(violationFutures);
        List<Entity1> aggregatedViolations = new ArrayList<>();
        for (List<Entity1> list: vcf.get()) {
            aggregatedViolations.addAll(list);
        }

        int numProcessors = Runtime.getRuntime().availableProcessors();
        List<BigInteger> idList= //somehow get a list of ids from aggregatedViolations
        List<List<BigInteger>> partitionedList = ListUtils.partition(idList, numProcessors);

        List<CompletableFuture<List<Entity2>>> trendFutures = new ArrayList<>();

        for (List<BigInteger> ids: partitionedList) {
            for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
                trendFutures.add(asyncService.execute2(getIds(devices), query2));
            }
        }

        CompletableFuture<List<List<Entity2>>> tcf = sequence(trendFutures);

        // rest of the business logic is dependent on the above queries execution

        return null;
    }

    private static <T> CompletableFuture<List<List<T>>> sequence(List<CompletableFuture<List<T>>> futures) {
        CompletableFuture<Void> allDoneFuture =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v ->
                futures.stream().
                        map(future -> future.join()).
                        collect(Collectors.toList())
        );
    }

我相信这与在多线程中使用的EntityManager有关。但是,当我阅读文档时,@transactional每次都会提供新的EM。如果executeQuery1()能够在不同线程上并行运行2个查询,为什么executeQuery2()关闭EM?

I am trying to execute a couple of queries on different threads. There are 2 top level queries each executing on different tables at runtime. For executing the first set of queries (executeQuery1()), I spawn 2 different threads and they are processed well. From the output of these queries, I have to extract a list of ids and then fire another set of queries (executeQuery2()) on entirely different threads. As soon as the second set of queries are about to be submitted to the database, I see that EntityManager is closed and the application shutdown.

2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'springApplicationAdminRegistrar'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'mbeanExporter'
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans on shutdown
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter        : Unregistering JMX-exposed beans
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'defaultValidator'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'org.springframework.data.jpa.util.JpaMetamodelCacheCleanup'
2022-04-28 22:34:29.530 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'threadPoolTaskExecutor'
2022-04-28 22:34:29.530 DEBUG 48403 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'threadPoolTaskExecutor'
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaEntityManagerFactory': [verticaTransactionManager]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaTransactionManager': [transactionTemplate]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking destroy() on bean with name 'verticaEntityManagerFactory'
2022-04-28 22:34:29.531  INFO 48403 --- [main] c.b.a.n.h.c.VerticaDataSourceConfig$1    : Closing JPA EntityManagerFactory for persistence unit 'vertica'
2022-04-28 22:34:29.531 DEBUG 48403 --- [main] o.hibernate.internal.SessionFactoryImpl  : HHH000031: Closing
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.engine.query.spi.QueryPlanCache      : Cleaning QueryPlan Cache
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope     : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@77774571] for TypeConfiguration
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope     : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@44af588b] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@77774571]
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.s.i.AbstractServiceRegistryImpl      : Implicitly destroying ServiceRegistry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.b.r.i.BootstrapServiceRegistryImpl   : Implicitly destroying Boot-strap registry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 TRACE 48403 --- [MyAsyncThread-4] j.i.AbstractLogicalConnectionImplementor : Preparing to begin transaction via JDBC Connection.setAutoCommit(false)
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'verticaDataSource': [dataSourceScriptDatabaseInitializer, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration, jdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'dataSourceScriptDatabaseInitializer': [jdbcTemplate, namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'jdbcTemplate': [namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration': [jpaVendorAdapter, entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'jpaVendorAdapter': [entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter    : Invoking close() on bean with name 'verticaDataSource'
2022-04-28 22:34:29.533  INFO 48403 --- [main] com.zaxxer.hikari.HikariDataSource       : vertica-db-pool - Shutdown initiated...
2022-04-28 22:34:29.533 DEBUG 48403 --- [main] com.zaxxer.hikari.pool.HikariPool        : vertica-db-pool - Before shutdown stats (total=20, active=2, idle=18, waiting=0)

I have 2 datasources in my Spring Boot app and therefore have to configure datasources programmatically.

AsyncConfiguration.java

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("MyAsyncThread-");
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return new ThreadPoolTaskExecutor();
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }
}

AsyncService.java

@Slf4j
@Service
public class AsyncService {

    @Autowired VerticaRepository verticaRepo;

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<Entity1>> execute1(String query) {
        List<Entity1> result = verticaRepo.executeQuery1(query);
        return CompletableFuture.completedFuture(result);
    }

    @Async("threadPoolTaskExecutor")
    public CompletableFuture<List<Entity2>> execute2(List<BigInteger> ids, String query) {
        List<Entity2> result = verticaRepo.executeQuery2(ids, query);
        return CompletableFuture.completedFuture(result);
    }
}

VerticaDataSourceConfig.java

@Configuration
@ConfigurationProperties("vertica.datasource")
@EnableTransactionManagement
@EnableJpaRepositories(
        entityManagerFactoryRef = "verticaEntityManagerFactory",
        transactionManagerRef = "verticaTransactionManager",
        basePackages = { "mypackage.repository" }
)
public class VerticaDataSourceConfig /*extends HikariConfig*/ {
    public final static String PERSISTENCE_UNIT_NAME = "vertica";
    public final static String PACKAGES_TO_SCAN = "mypackage.entity";

    @Autowired
    private Environment env;

    @Bean
    public HikariDataSource verticaDataSource() {
        HikariConfig hikariConfig = new HikariConfig();

        hikariConfig.setJdbcUrl(env.getProperty("vertica.datasource.jdbc-url"));
        hikariConfig.setUsername(env.getProperty("vertica.datasource.username"));
        hikariConfig.setPassword(env.getProperty("vertica.datasource.password"));
        hikariConfig.setDriverClassName(env.getProperty("vertica.datasource.driver-class-name"));

        hikariConfig.setConnectionTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.connectionTimeout")));
        hikariConfig.setIdleTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.idleTimeout")));
        hikariConfig.setMaxLifetime(Long.parseLong(env.getProperty("vertica.datasource.hikari.maxLifetime")));
        hikariConfig.setKeepaliveTime(Long.parseLong(env.getProperty("vertica.datasource.hikari.keepaliveTime")));
        hikariConfig.setMaximumPoolSize(Integer.parseInt(env.getProperty("vertica.datasource.hikari.maximumPoolSize")));
        hikariConfig.setPoolName(env.getProperty("vertica.datasource.hikari.poolName"));
        hikariConfig.setValidationTimeout(Integer.parseInt(env.getProperty("vertica.datasource.hikari.validationTimeout")));

        return new HikariDataSource(hikariConfig);
    }

    @Bean
    public LocalContainerEntityManagerFactoryBean verticaEntityManagerFactory(
            final HikariDataSource verticaDataSource) {

        return new LocalContainerEntityManagerFactoryBean() {{
            setDataSource(verticaDataSource);
            setPersistenceProviderClass(HibernatePersistenceProvider.class);
            setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
            setPackagesToScan(PACKAGES_TO_SCAN);

            Properties jpaProperties = new Properties();
            jpaProperties.put("hibernate.ddl-auto", env.getProperty("vertica.jpa.hibernate.ddl-auto"));
            jpaProperties.put("hibernate.show-sql", env.getProperty("vertica.jpa.hibernate.show-sql"));
            jpaProperties.put("hibernate.format_sql", env.getProperty("vertica.jpa.hibernate.format_sql"));
            jpaProperties.put("hibernate.dialect", env.getProperty("vertica.jpa.properties.hibernate.dialect"));

            setJpaProperties(jpaProperties);

            afterPropertiesSet();;
        }};
    }

    @Bean
    public PlatformTransactionManager verticaTransactionManager(EntityManagerFactory verticaEntityManagerFactory) {
        return new JpaTransactionManager(verticaEntityManagerFactory);
    }
}

VerticaRepository.java

@Repository
public class VerticaRepository {
    //@Autowired
    @PersistenceContext(unitName = "vertica")
    private EntityManager em;

    @Transactional
    public List<Entity1> executeQuery1(String queryStr) {
        // query.setParameter() can only replace parameters in WHERE clause of a query;
        // it cannot replace table or column names
        String replacedQuery = // replace table name and column name
        Query query = em.createNativeQuery(replacedQuery);
        

        List<Object[]> result = query.getResultList();

        List<Entity1> entities = new ArrayList<>();

        // fill entities list with result

        return entities;
    }

    @Transactional
    public List<Entity2> executeQuery2(List<BigInteger> ids, String queryStr) {

        String replacedQuery = // replace table name and column name; the table and col names are different from the ones in executeQWuery1()
        Query query = em.createNativeQuery(replacedQuery);
        

        List<Object[]> result = query.getResultList();

        List<Entity2> entities = new ArrayList<>();

        // fill entities list with result

        return entities;
    }
}

BusinessService.java

@Slf4j
@Component("businessService")
public class BusinessService {

    @Autowired
    private String query1;

    @Autowired
    private String query2;

    @Autowired private AsyncService asyncService;

    public Void serve() throws Exception {
        
        List<CompletableFuture<List<Entity1>>> violationFutures = new ArrayList<>();
        
        for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
            violationFutures.add(asyncService.execute1(query1));
        }

        CompletableFuture<List<List<Entity1>>> vcf = sequence(violationFutures);
        List<Entity1> aggregatedViolations = new ArrayList<>();
        for (List<Entity1> list: vcf.get()) {
            aggregatedViolations.addAll(list);
        }

        int numProcessors = Runtime.getRuntime().availableProcessors();
        List<BigInteger> idList= //somehow get a list of ids from aggregatedViolations
        List<List<BigInteger>> partitionedList = ListUtils.partition(idList, numProcessors);

        List<CompletableFuture<List<Entity2>>> trendFutures = new ArrayList<>();

        for (List<BigInteger> ids: partitionedList) {
            for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
                trendFutures.add(asyncService.execute2(getIds(devices), query2));
            }
        }

        CompletableFuture<List<List<Entity2>>> tcf = sequence(trendFutures);

        // rest of the business logic is dependent on the above queries execution

        return null;
    }

    private static <T> CompletableFuture<List<List<T>>> sequence(List<CompletableFuture<List<T>>> futures) {
        CompletableFuture<Void> allDoneFuture =
                CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
        return allDoneFuture.thenApply(v ->
                futures.stream().
                        map(future -> future.join()).
                        collect(Collectors.toList())
        );
    }

I believe that this has something to do with the EntityManagers being used in multi-threaded env. However, when I read the documentation, @Transactional will supply a new EM everytime. If the executeQuery1() was able to run 2 queries in parallel on different threads, why is executeQuery2() closing the EM?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文