Apache Beam Pipeline:OutOfMemoryException在用Google DataFlow编写AVRO文件为Google Cloud存储时
我们在Apache Beam Java SDK 2.34.0中开发了一个批处理管道,并使用Google Cloud DataFlow Runner运行。我们有一个步骤来编写AVRO文件。 Avro Write引发了OutofMemory例外。 Batch试图编写大约800个AVRO文件,每个文件不超过50kb。
Error message from worker: An OutOfMemoryException occurred. Consider specifying higher memory instances in PipelineOptions.
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.finishBundle(WriteFiles.java:974)
Caused by: java.lang.OutOfMemoryError: Java heap space
com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:579)
com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:380)
com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:308)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:528)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
配置:
WorkerType:N2-Standard-4
NUMSHARDS:10
作者配置:
public static Write<String, Evaluation> getAvroWriter(ValueProvider<String> avroFilesPath,
ValueProvider<Integer> shards) {
return FileIO.<String, Evaluation>writeDynamic().withNumShards(shards)
.by(Evaluation::getId).withDestinationCoder(StringUtf8Coder.of())
.withNaming(Contextful.fn(fileName -> (window, pane, numShards, shardIndex, compression) -> {
return fileName + ".avro";
})).via(AvroIO.sink(Evaluation.class)).to(avroFilesPath);
}
对堆转储进行了检查,这是通过看到流/字节[]使用的内存而感到惊讶的。
heap dump dominator树
top_consumer_html
apache beam io io库中的apache beam io库中有问题吗?
We have a batch pipeline developed in Apache Beam Java SDK 2.34.0 and running with Google Cloud Dataflow runner. We have a step to write avro files. Avro write is throwing OutofMemory exception. Batch is trying to write around 800 avro files, each file not more than 50kb.
Error message from worker: An OutOfMemoryException occurred. Consider specifying higher memory instances in PipelineOptions.
java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError: Java heap space
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.finishBundle(WriteFiles.java:974)
Caused by: java.lang.OutOfMemoryError: Java heap space
com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:579)
com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:380)
com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:308)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:528)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel$UploadOperation.call(AbstractGoogleAsyncWriteChannel.java:85)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
Configurations:
WorkerType: n2-standard-4
numShards: 10
Writer configuration:
public static Write<String, Evaluation> getAvroWriter(ValueProvider<String> avroFilesPath,
ValueProvider<Integer> shards) {
return FileIO.<String, Evaluation>writeDynamic().withNumShards(shards)
.by(Evaluation::getId).withDestinationCoder(StringUtf8Coder.of())
.withNaming(Contextful.fn(fileName -> (window, pane, numShards, shardIndex, compression) -> {
return fileName + ".avro";
})).via(AvroIO.sink(Evaluation.class)).to(avroFilesPath);
}
Took heap dump for inspection, Surprised by seeing memory used by stream/byte[].
heap dump dominator tree
top_consumer_html
Is something wrong in Apache Beam IO Library with dataflow runner?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论