如何通过Java应用程序向GCP提交Apache Beam DataFlow作业

发布于 2025-02-13 20:00:00 字数 2657 浏览 0 评论 0原文

我有一个数据流作业,该作业用Java用Apache Beam编写。我能够通过此步骤在GCP中运行数据流量。

  1. 从我的代码创建了数据流模板。然后在云存储中上传模板。
  2. 从模板选项中直接创建作业 该流程正常。 我想在Java应用程序中采取同样的一步。意思是,当有人向该API发送请求时,我有一个API,我想通过已经存储在存储中的模板开始此数据流作业。

我可以看到REST API可用于实现此方法。如下所示,

POST /v1b3/projects/project_id/locations/loc/templates:launch?gcsPath=template-location

但我没有找到任何参考或样本。我尝试了以下方法 在我的Springboot项目中,我添加了此依赖关系

<!-- https://mvnrepository.com/artifact/com.google.apis/google-api-services-dataflow -->
        <dependency>
            <groupId>com.google.apis</groupId>
            <artifactId>google-api-services-dataflow</artifactId>
            <version>v1b3-rev20210825-1.32.1</version>
        </dependency>

,并在控制器中添加了下面的代码,

public static void createJob() throws IOException {
        GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("myCertKey.json")).createScoped(
            java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));       
        try{
            Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
            credential).setApplicationName("my-job").build();           --- this gives error
            
            //RuntimeEnvironment
            RuntimeEnvironment env = new RuntimeEnvironment();
            env.setBypassTempDirValidation(false);
            //all my env configs added

            //parameters
            HashMap<String,String> params = new HashMap<>();
            params.put("bigtableEmulatorPort", "-1"); 
            params.put("gcsPath", "gs://bucket//my.json");
            // all other params
            
            LaunchTemplateParameters content = new LaunchTemplateParameters();
            content.setJobName("Test-job");
            content.setEnvironment(env);
            content.setParameters(params);
            
            dataflow.projects().locations().templates().launch("project-id", "location", content);           
        }catch (Exception e){
            log.info("error occured", e);
        }
    }

这给出了{“ id”:null,“消息”:“'boolean com.google.api.client.http.http.httptransport.ismtls()' }

在此行本身中

Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
            credential).setApplicationName("my-job").build();

,这是BCS,此数据流构建器期望httptransport作为第一个参数,但是我通过lowlevelhttprequest()

我不确定这是实现此目标的正确方法。有人可以提出任何想法吗?如何实施?有什么例子或参考吗?

多谢 :)

I have a dataflow job which is written in apache beam with java. I am able run the dataflow job in GCP through this steps.

  1. Created dataflow template from my code. Then uploading template in cloud storage.
  2. Directly creating job from template option available in GCP->Dataflow->jobs
    This flow is working fine.
    I want to do same step through java app. means, I have one api when someone sends request to that api, I want to start this dataflow job through the template which I have already stored in storage.

I could see rest api is available to implement this approach. as below,

POST /v1b3/projects/project_id/locations/loc/templates:launch?gcsPath=template-location

But I didn't find any reference or samples for this. I tried the below approach
In my springboot project I added this dependency

<!-- https://mvnrepository.com/artifact/com.google.apis/google-api-services-dataflow -->
        <dependency>
            <groupId>com.google.apis</groupId>
            <artifactId>google-api-services-dataflow</artifactId>
            <version>v1b3-rev20210825-1.32.1</version>
        </dependency>

and added below code in controller

public static void createJob() throws IOException {
        GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("myCertKey.json")).createScoped(
            java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));       
        try{
            Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
            credential).setApplicationName("my-job").build();           --- this gives error
            
            //RuntimeEnvironment
            RuntimeEnvironment env = new RuntimeEnvironment();
            env.setBypassTempDirValidation(false);
            //all my env configs added

            //parameters
            HashMap<String,String> params = new HashMap<>();
            params.put("bigtableEmulatorPort", "-1"); 
            params.put("gcsPath", "gs://bucket//my.json");
            // all other params
            
            LaunchTemplateParameters content = new LaunchTemplateParameters();
            content.setJobName("Test-job");
            content.setEnvironment(env);
            content.setParameters(params);
            
            dataflow.projects().locations().templates().launch("project-id", "location", content);           
        }catch (Exception e){
            log.info("error occured", e);
        }
    }

This gives {"id":null,"message":"'boolean com.google.api.client.http.HttpTransport.isMtls()'"}

error in this line itself

Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
            credential).setApplicationName("my-job").build();

this is bcs, this dataflow builder expects HttpTransport as 1st argument but I passed LowLevelHttpRequest()

I am not sure is this the correct way to implement this. Can any one suggest any ideas on this? how to implement this? any examples or reference ?

Thanks a lot :)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文