如何使用Apache Beam Java将镶木quet文件写入AWS S3?
我正在尝试转换JSON - >通用记录 - > PARQUET-到 - > S3。我能够将其转换为Parquet,但我不知道如何将Parquet文件直接放置在S3的情况下,而无需将其存储到文件系统中。
我为此写的代码:
public static void main(String[] args) throws IOException {
PipelineOptionsFactory.register(MainConfig.class);
MainConfig options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MainConfig.class);
Pipeline pipeLine = Pipeline.create(options);
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(options.getAWSAccessKey(), options.getAWSSecretKey());
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials));
Schema jsonSchema = new Schema.Parser().parse(schemaString);
logger.info(jsonSchema.getFields());
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
.setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
.apply("Generation the parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(jsonSchema)).to(options.getOutput()).withNumShards(1).withSuffix(".parquet"));
pipeLine.run();
}
最后,我只想将该镶木添加到S3
I am trying to convert Json -> Generic Record -> Parquet --to--> S3. I am able to convert it to Parquet but I don't know how to directly put Parquet file to S3 without storing it into filesystem.
Code I wrote for this:
public static void main(String[] args) throws IOException {
PipelineOptionsFactory.register(MainConfig.class);
MainConfig options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MainConfig.class);
Pipeline pipeLine = Pipeline.create(options);
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(options.getAWSAccessKey(), options.getAWSSecretKey());
options.setAwsCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials));
Schema jsonSchema = new Schema.Parser().parse(schemaString);
logger.info(jsonSchema.getFields());
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
.setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
.apply("Generation the parquet files", FileIO.<GenericRecord>write().via(ParquetIO.sink(jsonSchema)).to(options.getOutput()).withNumShards(1).withSuffix(".parquet"));
pipeLine.run();
}
At the end I just want to add that parquet to S3
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论