Hazelcast Jet Job无法访问春季环境

发布于 2025-02-10 02:16:16 字数 4068 浏览 0 评论 0原文

我正在创建一个Spring Boot应用程序,在我的应用程序中,我正在尝试在正常的Spring应用程序中使用Hazelcast Jet,它运行正常,但是在Spring Boot应用程序中尝试时,它无法访问Bean。豆子正在动态创建,因此我可以通过AppBeans类访问它们。

对于依赖关系,我已经采用了3:

<dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>4.2</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast.jet</groupId>
        <artifactId>hazelcast-jet</artifactId>
        <version>4.5</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-spring</artifactId>
        <version>4.2</version>
    </dependency>

对于创建Hazelcast实例,我正在使用以下代码:

@SpringAware
public class DataFactory implements Serializable {
   SpringManagedContext springManagedContext = (SpringManagedContext) AppBeans.getBean("springManagedContext");

public JetInstance buildJetInstance() {
    if (!EtlObjects.jetStart) {
        
    EtlObjects.jetStart = true;
    JetConfig jetConfig = new JetConfig();
    jetConfig.getHazelcastConfig().setProperty( "hazelcast.logging.type", "log4j" );
    jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
    jetConfig.configureHazelcast(c -> {
        c.getNetworkConfig().setReuseAddress(true);
        c.setClusterName(UUID.randomUUID().toString());
        c.setManagedContext(springManagedContext);
        c.getNetworkConfig().setPort(9493);
        c.getNetworkConfig().setPublicAddress("localhost");
        c.getNetworkConfig().setPortAutoIncrement(true);
        c.getNetworkConfig().getJoin().getAutoDetectionConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
        
    });
    EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);

    }
    return EtlObjects.jetInstance;
 }
}

在工作中,当我尝试访问bean时,它给了我null:

@Component
public class JDBCDataSource implements ISourceBatch, Serializable {
        @Override
         public BatchStage<Object> readSource(Pipeline pipeline) {
                 BatchSource<Object> jdbcSource = Sources
                .jdbc(() -> {

                 **> //Here it giving null cannot access datasource Bean**

                    Connection conn = ((DataSource)AppBeans.getBean(thissource.get("datasourceName").toString())).getConnection();
                    return conn;
                },
                    (con, parallelism, index) -> {
                     ..........my code
                   }});
         }
}

AppBeans类也如下:

@Component("s")
public class AppBeans implements ApplicationContextAware, 
ServletContextListener,BeanDefinitionRegistryPostProcessor,Serializable{
private static ApplicationContext CONTEXT;
private static ServletContext SERVLETCONTEXT;
private static BeanDefinitionRegistry REGISTRY;
private transient AutowireCapableBeanFactory beanFactory;


public void setApplicationContext(ApplicationContext context) throws BeansException {
   
    CONTEXT = context;
    beanFactory = context.getAutowireCapableBeanFactory();
 }

 public static Object getBean(String beanName) {
    return CONTEXT.getBean(beanName);
  }

}

也用于启动应用程序:

@SpringBootApplication(exclude = {MongoAutoConfiguration.class, MongoDataAutoConfiguration.class, FreeMarkerAutoConfiguration.class})
@ComponentScan(basePackages =  {"com"})
@EnableAutoConfiguration
public class AppApplication extends SpringBootServletInitializer {

    public static void main(String[] args) {
    SpringApplication.run(AppApplication.class, args);
   }
   @Bean
   public SpringManagedContext springManagedContext() {
     return new SpringManagedContext();
   }
}

请让我让我知道如何喷射来源访问春季上下文。让我知道是否没有什么清楚

I am createing a spring boot application and in my application I am trying to use Hazelcast Jet in normal spring application it is working properly but while trying in spring boot application it is not able to access the beans. Beans are getting created dynamically so I have access them through AppBeans class.

For dependency I have take this 3:

<dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-all</artifactId>
        <version>4.2</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast.jet</groupId>
        <artifactId>hazelcast-jet</artifactId>
        <version>4.5</version>
    </dependency>
    <dependency>
        <groupId>com.hazelcast</groupId>
        <artifactId>hazelcast-spring</artifactId>
        <version>4.2</version>
    </dependency>

For create Hazelcast Instance I am using below code:

@SpringAware
public class DataFactory implements Serializable {
   SpringManagedContext springManagedContext = (SpringManagedContext) AppBeans.getBean("springManagedContext");

public JetInstance buildJetInstance() {
    if (!EtlObjects.jetStart) {
        
    EtlObjects.jetStart = true;
    JetConfig jetConfig = new JetConfig();
    jetConfig.getHazelcastConfig().setProperty( "hazelcast.logging.type", "log4j" );
    jetConfig.getInstanceConfig().setCooperativeThreadCount(5);
    jetConfig.configureHazelcast(c -> {
        c.getNetworkConfig().setReuseAddress(true);
        c.setClusterName(UUID.randomUUID().toString());
        c.setManagedContext(springManagedContext);
        c.getNetworkConfig().setPort(9493);
        c.getNetworkConfig().setPublicAddress("localhost");
        c.getNetworkConfig().setPortAutoIncrement(true);
        c.getNetworkConfig().getJoin().getAutoDetectionConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getMulticastConfig().setEnabled(false);
        c.getNetworkConfig().getJoin().getTcpIpConfig().setEnabled(true).setMembers(Arrays.asList(new String[] {"localhost"}));
        
    });
    EtlObjects.jetInstance = Jet.newJetInstance(jetConfig);

    }
    return EtlObjects.jetInstance;
 }
}

And in job when I am trying to access the beans it is giving me null:

@Component
public class JDBCDataSource implements ISourceBatch, Serializable {
        @Override
         public BatchStage<Object> readSource(Pipeline pipeline) {
                 BatchSource<Object> jdbcSource = Sources
                .jdbc(() -> {

                 **> //Here it giving null cannot access datasource Bean**

                    Connection conn = ((DataSource)AppBeans.getBean(thissource.get("datasourceName").toString())).getConnection();
                    return conn;
                },
                    (con, parallelism, index) -> {
                     ..........my code
                   }});
         }
}

And AppBeans class is as follow:

@Component("s")
public class AppBeans implements ApplicationContextAware, 
ServletContextListener,BeanDefinitionRegistryPostProcessor,Serializable{
private static ApplicationContext CONTEXT;
private static ServletContext SERVLETCONTEXT;
private static BeanDefinitionRegistry REGISTRY;
private transient AutowireCapableBeanFactory beanFactory;


public void setApplicationContext(ApplicationContext context) throws BeansException {
   
    CONTEXT = context;
    beanFactory = context.getAutowireCapableBeanFactory();
 }

 public static Object getBean(String beanName) {
    return CONTEXT.getBean(beanName);
  }

}

Also for starting application:

@SpringBootApplication(exclude = {MongoAutoConfiguration.class, MongoDataAutoConfiguration.class, FreeMarkerAutoConfiguration.class})
@ComponentScan(basePackages =  {"com"})
@EnableAutoConfiguration
public class AppApplication extends SpringBootServletInitializer {

    public static void main(String[] args) {
    SpringApplication.run(AppApplication.class, args);
   }
   @Bean
   public SpringManagedContext springManagedContext() {
     return new SpringManagedContext();
   }
}

Please let me know how can I Jet Sources give access to spring context. Let me know if anything is not clear

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

云朵有点甜 2025-02-17 02:16:16

您需要将春季托管上下文传递给作业源,然后可以使用Spring的应用程序上下文初始化。

尝试

    static Pipeline buildPipeline() {
        return Pipeline.create()
                .readFrom(mySource())
                .writeTo(Sinks.logger())
                .getPipeline();
    }
    
    static BatchSource<String> mySource() {
        return SourceBuilder.batch("72743077", 
                        jobContext -> new MyBatchSource(jobContext.hazelcastInstance().getConfig().getManagedContext()))
                    .fillBufferFn(MyBatchSource::fillBufferFn)
                    .build();
    }

    @SpringAware
    static class MyBatchSource implements ApplicationContextAware {
        private ApplicationContext applicationContext;
        
        MyBatchSource(ManagedContext managedContext) {
            if (managedContext instanceof SpringManagedContext) {
                SpringManagedContext springManagedContext = (SpringManagedContext) managedContext;
                springManagedContext.initialize(this);
            }
        }
        
        void fillBufferFn(SourceBuilder.SourceBuffer<String> buffer) {
            Config config = this.applicationContext.getBean(Config.class);
            buffer.add("HELLO FROM " + config.getClusterName());
            buffer.close();
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

使其正常工作,请确保在创建服务器实例之前将弹簧托管上下文插入Hazelcast配置中。

    @Bean
    public SpringManagedContext managedContext() {
        return new SpringManagedContext();
    }
    
    @Bean
    public Config config(ManagedContext managedContext) {
        Config config = new Config();
        config.setManagedContext(managedContext);

另外,如果您提高到5.1.2或最新内容,则只需要com.hazelcast:Hazelcast-spring:5.1.2作为依赖项。

You need to pass the Spring Managed Context to the job source, which can then initialize with Spring's Application Context.

Try

    static Pipeline buildPipeline() {
        return Pipeline.create()
                .readFrom(mySource())
                .writeTo(Sinks.logger())
                .getPipeline();
    }
    
    static BatchSource<String> mySource() {
        return SourceBuilder.batch("72743077", 
                        jobContext -> new MyBatchSource(jobContext.hazelcastInstance().getConfig().getManagedContext()))
                    .fillBufferFn(MyBatchSource::fillBufferFn)
                    .build();
    }

    @SpringAware
    static class MyBatchSource implements ApplicationContextAware {
        private ApplicationContext applicationContext;
        
        MyBatchSource(ManagedContext managedContext) {
            if (managedContext instanceof SpringManagedContext) {
                SpringManagedContext springManagedContext = (SpringManagedContext) managedContext;
                springManagedContext.initialize(this);
            }
        }
        
        void fillBufferFn(SourceBuilder.SourceBuffer<String> buffer) {
            Config config = this.applicationContext.getBean(Config.class);
            buffer.add("HELLO FROM " + config.getClusterName());
            buffer.close();
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

To make it work, ensure the Spring Managed Context is plugged into the Hazelcast configuration before creating the server instance.

    @Bean
    public SpringManagedContext managedContext() {
        return new SpringManagedContext();
    }
    
    @Bean
    public Config config(ManagedContext managedContext) {
        Config config = new Config();
        config.setManagedContext(managedContext);

Also, if you advance to 5.1.2 or whatever is the latest, you only need com.hazelcast:hazelcast-spring:5.1.2 as a dependency.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文