如何通过Java应用程序向GCP提交Apache Beam DataFlow作业
我有一个数据流作业,该作业用Java用Apache Beam编写。我能够通过此步骤在GCP中运行数据流量。
- 从我的代码创建了数据流模板。然后在云存储中上传模板。
- 从模板选项中直接创建作业 该流程正常。 我想在Java应用程序中采取同样的一步。意思是,当有人向该API发送请求时,我有一个API,我想通过已经存储在存储中的模板开始此数据流作业。
我可以看到REST API可用于实现此方法。如下所示,
POST /v1b3/projects/project_id/locations/loc/templates:launch?gcsPath=template-location
但我没有找到任何参考或样本。我尝试了以下方法 在我的Springboot项目中,我添加了此依赖关系
<!-- https://mvnrepository.com/artifact/com.google.apis/google-api-services-dataflow -->
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataflow</artifactId>
<version>v1b3-rev20210825-1.32.1</version>
</dependency>
,并在控制器中添加了下面的代码,
public static void createJob() throws IOException {
GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("myCertKey.json")).createScoped(
java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));
try{
Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
credential).setApplicationName("my-job").build(); --- this gives error
//RuntimeEnvironment
RuntimeEnvironment env = new RuntimeEnvironment();
env.setBypassTempDirValidation(false);
//all my env configs added
//parameters
HashMap<String,String> params = new HashMap<>();
params.put("bigtableEmulatorPort", "-1");
params.put("gcsPath", "gs://bucket//my.json");
// all other params
LaunchTemplateParameters content = new LaunchTemplateParameters();
content.setJobName("Test-job");
content.setEnvironment(env);
content.setParameters(params);
dataflow.projects().locations().templates().launch("project-id", "location", content);
}catch (Exception e){
log.info("error occured", e);
}
}
这给出了{“ id”:null,“消息”:“'boolean com.google.api.client.http.http.httptransport.ismtls()' }
在此行本身中
Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
credential).setApplicationName("my-job").build();
,这是BCS,此数据流构建器期望httptransport
作为第一个参数,但是我通过lowlevelhttprequest()
我不确定这是实现此目标的正确方法。有人可以提出任何想法吗?如何实施?有什么例子或参考吗?
多谢 :)
I have a dataflow job which is written in apache beam with java. I am able run the dataflow job in GCP through this steps.
- Created dataflow template from my code. Then uploading template in cloud storage.
- Directly creating job from template option available in GCP->Dataflow->jobs
This flow is working fine.
I want to do same step through java app. means, I have one api when someone sends request to that api, I want to start this dataflow job through the template which I have already stored in storage.
I could see rest api is available to implement this approach. as below,
POST /v1b3/projects/project_id/locations/loc/templates:launch?gcsPath=template-location
But I didn't find any reference or samples for this. I tried the below approach
In my springboot project I added this dependency
<!-- https://mvnrepository.com/artifact/com.google.apis/google-api-services-dataflow -->
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataflow</artifactId>
<version>v1b3-rev20210825-1.32.1</version>
</dependency>
and added below code in controller
public static void createJob() throws IOException {
GoogleCredential credential = GoogleCredential.fromStream(new FileInputStream("myCertKey.json")).createScoped(
java.util.Arrays.asList("https://www.googleapis.com/auth/cloud-platform"));
try{
Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
credential).setApplicationName("my-job").build(); --- this gives error
//RuntimeEnvironment
RuntimeEnvironment env = new RuntimeEnvironment();
env.setBypassTempDirValidation(false);
//all my env configs added
//parameters
HashMap<String,String> params = new HashMap<>();
params.put("bigtableEmulatorPort", "-1");
params.put("gcsPath", "gs://bucket//my.json");
// all other params
LaunchTemplateParameters content = new LaunchTemplateParameters();
content.setJobName("Test-job");
content.setEnvironment(env);
content.setParameters(params);
dataflow.projects().locations().templates().launch("project-id", "location", content);
}catch (Exception e){
log.info("error occured", e);
}
}
This gives {"id":null,"message":"'boolean com.google.api.client.http.HttpTransport.isMtls()'"}
error in this line itself
Dataflow dataflow = new Dataflow.Builder(new LowLevelHttpRequest(), new JacksonFactory(),
credential).setApplicationName("my-job").build();
this is bcs, this dataflow builder expects HttpTransport
as 1st argument but I passed LowLevelHttpRequest()
I am not sure is this the correct way to implement this. Can any one suggest any ideas on this? how to implement this? any examples or reference ?
Thanks a lot :)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论