StreamingQueryException:错误时列表shards

发布于 2025-01-26 15:31:53 字数 2475 浏览 1 评论 0原文

我有一个Kinesis数据流,我想通过使用AWS胶将其插入AWS Redshift。我创建了爬行者来携带源表和目标表。

当我将记录保存在S3上而不是红移时。

但是,我一直在遇到错误

“流QueryException:列表shards时错误”


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Kinesis Stream
dataframe_KinesisStream_node1 = glueContext.create_data_frame.from_catalog(
   database="dev",
   table_name="ventilators_table_kinesis",
   additional_options={"startingPosition": "earliest", "inferSchema": "false"},
   transformation_ctx="dataframe_KinesisStream_node1",
)


def processBatch(data_frame, batchId):
   if data_frame.count() > 0:
       KinesisStream_node1 = DynamicFrame.fromDF(
           data_frame, glueContext, "from_data_frame"
       )
       # Script generated for node ApplyMapping
       ApplyMapping_node2 = ApplyMapping.apply(
           frame=KinesisStream_node1,
           mappings=[
               ("ventilatorid", "int", "ventilatorid", "int"),
               ("eventtime", "string", "eventtime", "string"),
               ("serialnumber", "string", "serialnumber", "string"),
               ("pressurecontrol", "int", "pressurecontrol", "int"),
               ("o2stats", "int", "o2stats", "int"),
               ("minutevolume", "int", "minutevolume", "int"),
               ("manufacturer", "string", "manufacturer", "string"),
           ],
           transformation_ctx="ApplyMapping_node2",
       )

       # Script generated for node Redshift Cluster
       RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
           frame=ApplyMapping_node2,
           database="dev",
           table_name="dev_projectlightspeed_ventilators_table",
           redshift_tmp_dir=args["TempDir"],
           transformation_ctx="RedshiftCluster_node3",
       )


glueContext.forEachBatch(
   frame=dataframe_KinesisStream_node1,
   batch_function=processBatch,
   options={
       "windowSize": "5 seconds",
       "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
   },
)
job.commit()

请帮助!

I have a Kinesis data Stream whose records I want to insert it in the AWS redshift with using AWS Glue.I created crawlers to bring source table and target table .They are working fine with .

The code works when I save the records on S3 instead of Redshift .But When using Target as Redshift I get error.

However I keep getting error

"StreamingQueryException: Error while List shards"


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Kinesis Stream
dataframe_KinesisStream_node1 = glueContext.create_data_frame.from_catalog(
   database="dev",
   table_name="ventilators_table_kinesis",
   additional_options={"startingPosition": "earliest", "inferSchema": "false"},
   transformation_ctx="dataframe_KinesisStream_node1",
)


def processBatch(data_frame, batchId):
   if data_frame.count() > 0:
       KinesisStream_node1 = DynamicFrame.fromDF(
           data_frame, glueContext, "from_data_frame"
       )
       # Script generated for node ApplyMapping
       ApplyMapping_node2 = ApplyMapping.apply(
           frame=KinesisStream_node1,
           mappings=[
               ("ventilatorid", "int", "ventilatorid", "int"),
               ("eventtime", "string", "eventtime", "string"),
               ("serialnumber", "string", "serialnumber", "string"),
               ("pressurecontrol", "int", "pressurecontrol", "int"),
               ("o2stats", "int", "o2stats", "int"),
               ("minutevolume", "int", "minutevolume", "int"),
               ("manufacturer", "string", "manufacturer", "string"),
           ],
           transformation_ctx="ApplyMapping_node2",
       )

       # Script generated for node Redshift Cluster
       RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
           frame=ApplyMapping_node2,
           database="dev",
           table_name="dev_projectlightspeed_ventilators_table",
           redshift_tmp_dir=args["TempDir"],
           transformation_ctx="RedshiftCluster_node3",
       )


glueContext.forEachBatch(
   frame=dataframe_KinesisStream_node1,
   batch_function=processBatch,
   options={
       "windowSize": "5 seconds",
       "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
   },
)
job.commit()

PLease help !!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

泪冰清 2025-02-02 15:31:53

面对此类似的错误,即streamQueryException:错误时列表shards在使用S3作为输出的胶水流式作业时。正如错误消息所指出的那样,有关丢失的运动权限(读取碎片)在IAM的角色中,问题的问题是。

给予适当的运动权限对我有用。

但是,由于您的工作在S3输出方面效果很好,但会导致Redshift问题,因此检查工作角色是否对Redshift和Kinesis都具有适当的权限。

Faced this similar error saying StreamingQueryException: Error while List shards while working with glue streaming job with S3 as output. As the error message indicates, the problem turned out regarding missing Kinesis permissions (to read the shards) in the IAM role of that job.

Giving the proper kinesis permissions worked for me.

But since your job works fine with S3 output but causes issue with Redshift, check that the job role has proper permissions for both redshift and kinesis.

眼藏柔 2025-02-02 15:31:53

我也面临着这个问题,但事实证明这是一个不同的问题。我将AWS胶水连接到另一个帐户中的运动流。正确设置了所有权限以允许跨帐户帐户,我仍然遇到相同的错误:streamingqueryException:错误时列表shards。最终,我在胶日志中发现了一些晦涩的消息:

'roleSessionName' failed to satisfy constraint: Member must have length less than or equal to 64

胶水是根据消防人的名称和附加UUID的名称创建此RolesessionName。这超出了这个限制。为了解决我需要创建一个名称较短的新消防人。

I also faced this issue, but it turns out it was a different problem. I am connecting AWS Glue to a Kinesis Stream in another account. All the permissions were setup correctly to allow cross account account and I was still getting this same error: StreamingQueryException: Error while List shards. Eventually I found some obscure message in the Glue logs:

'roleSessionName' failed to satisfy constraint: Member must have length less than or equal to 64

Glue was creating this roleSessionName based on the name of the firehose and appending a UUID. This was exceeding this limit. To solve I needed to create a new Firehose with a shorter name.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文