Spark Job返回“暂停了超过XXX秒,无法写入client”页面。
我正在从事一项Spark作业,负责从Cassandra DB读取数据,然后在数据上执行一些操作,例如根据条件更新一些记录等。火花适用于某些钥匙空间和tabkes,但对于其他一些键空间,它会像
WARN 2022-03-30 21:39:50,082 org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, 10.0.144.110, executor 0): com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:26)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.maybeFetchNextResult(DefaultContinuousPagingResult.java:72)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:62)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:50)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at com.company.spark.job.MySparkJobClass.lambda$handleMovingRackspaceImagesToS3$e3b46054$1(MySparkJobClass.java:167)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:184)
at com.datastax.driver.core.ContinuousPagingQueue.onResponse(ContinuousPagingQueue.java:148)
at com.datastax.driver.core.MultiResponseRequestHandler.setResult(MultiResponseRequestHandler.java:888)
at com.datastax.driver.core.MultiResponseRequestHandler.onSet(MultiResponseRequestHandler.java:600)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1253)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1160)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: com.datastax.driver.core.exceptions.ClientWriteException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:124)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:58)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:303)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:274)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
... 29 more
我从在这里 Spark执行者在心跳上有60个默认的最大失败,而心跳10 s,导致了600s,但问题现在是一个问题,现在是什么原因导致这些执行人员失败?有没有帮助的暗示?
更新:
执行人在放弃和退出之前尝试将心跳发送给驱动程序的次数(带有出口代码56)。
默认值:60
例如,使用Max Failures 60(默认值)和Spark.Executor.HeartBeatInterval 10s,然后执行人将尝试发送最多600秒(10分钟)的心跳。
I am working on a spark job that is responsible for reading data from a cassandra db and then performing some operations on the data, like updating some records, etc based on conditions. The spark works for some keyspaces and tabkes but for some other keyspaces it failes too early with a similar stacktrace like
WARN 2022-03-30 21:39:50,082 org.apache.spark.scheduler.TaskSetManager: Lost task 2.0 in stage 0.0 (TID 2, 10.0.144.110, executor 0): com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:26)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.maybeFetchNextResult(DefaultContinuousPagingResult.java:72)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:62)
at com.datastax.driver.core.DefaultContinuousPagingResult$RowIterator.computeNext(DefaultContinuousPagingResult.java:50)
at com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:143)
at com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:138)
at scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:438)
at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:12)
at scala.collection.Iterator$anon$13.hasNext(Iterator.scala:461)
at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at java.util.Iterator.forEachRemaining(Iterator.java:115)
at com.company.spark.job.MySparkJobClass.lambda$handleMovingRackspaceImagesToS3$e3b46054$1(MySparkJobClass.java:167)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.api.java.JavaRDDLike$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:219)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1$anonfun$apply$29.apply(RDD.scala:934)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.SparkContext$anonfun$runJob$5.apply(SparkContext.scala:2073)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:344)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.DriverException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.exceptions.DriverException.copy(DriverException.java:37)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:184)
at com.datastax.driver.core.ContinuousPagingQueue.onResponse(ContinuousPagingQueue.java:148)
at com.datastax.driver.core.MultiResponseRequestHandler.setResult(MultiResponseRequestHandler.java:888)
at com.datastax.driver.core.MultiResponseRequestHandler.onSet(MultiResponseRequestHandler.java:600)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1253)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1160)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1407)
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1177)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1221)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
Caused by: com.datastax.driver.core.exceptions.ClientWriteException: Paused for longer than 600 seconds and unable to write pages to client
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:124)
at com.datastax.driver.core.Responses$Error$1.decode(Responses.java:58)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:303)
at com.datastax.driver.core.Message$ProtocolDecoder.decode(Message.java:274)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
... 29 more
I found out from here that spark executors have 60 default maximum failures with 10s for the heartbeat, resulting to the 600s but a question the question is now, what causes these executures to fail ? Any hint that could help?
Update:
Number of times an Executor tries sending heartbeats to the driver before it gives up and exits (with exit code 56).
Default: 60
For example, with max failures 60 (the default) and spark.executor.heartbeatInterval 10s, then Executor will try to send heartbeats for up to 600s (10 minutes).
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
当服务器端读取时间太长时,
clientwriteException
将被Java驱动程序抛弃。不知道桌子模式以及您的火花工作正在运行什么查询,很难知道为什么阅读需要很长时间。诚然,堆栈痕迹对我来说看起来很陌生。您没有指定Spark Connector和Java驱动程序的版本,因此我假设您可能使用了不再支持的Java驱动程序的旧版本。
如果您使用背景详细信息(例如版本,架构,CQL查询,最小复制代码)更新原始问题,我很乐意对其进行查看并更新答案。干杯!
The
ClientWriteException
gets thrown by the Java driver when the server-side read took too long. Without knowing the table schema and what query your Spark job was running, it's hard to know why reads are taking a long time.Admittedly, the stack trace looks alien to me. You didn't specify the version of Spark connector and Java driver so I'm assuming that you're possibly using a very old version of the Java driver which is no longer supported.
If you update your original question with background details such as versions, schema, CQL query, minimal code to replicate, I'd be happy to review it and update my answer. Cheers!