StreamingQueryException:错误时列表shards
我有一个Kinesis数据流,我想通过使用AWS胶将其插入AWS Redshift。我创建了爬行者来携带源表和目标表。
当我将记录保存在S3上而不是红移时。
但是,我一直在遇到错误
“流QueryException:列表shards时错误”
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Kinesis Stream
dataframe_KinesisStream_node1 = glueContext.create_data_frame.from_catalog(
database="dev",
table_name="ventilators_table_kinesis",
additional_options={"startingPosition": "earliest", "inferSchema": "false"},
transformation_ctx="dataframe_KinesisStream_node1",
)
def processBatch(data_frame, batchId):
if data_frame.count() > 0:
KinesisStream_node1 = DynamicFrame.fromDF(
data_frame, glueContext, "from_data_frame"
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=KinesisStream_node1,
mappings=[
("ventilatorid", "int", "ventilatorid", "int"),
("eventtime", "string", "eventtime", "string"),
("serialnumber", "string", "serialnumber", "string"),
("pressurecontrol", "int", "pressurecontrol", "int"),
("o2stats", "int", "o2stats", "int"),
("minutevolume", "int", "minutevolume", "int"),
("manufacturer", "string", "manufacturer", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Redshift Cluster
RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
frame=ApplyMapping_node2,
database="dev",
table_name="dev_projectlightspeed_ventilators_table",
redshift_tmp_dir=args["TempDir"],
transformation_ctx="RedshiftCluster_node3",
)
glueContext.forEachBatch(
frame=dataframe_KinesisStream_node1,
batch_function=processBatch,
options={
"windowSize": "5 seconds",
"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
},
)
job.commit()
请帮助!
I have a Kinesis data Stream whose records I want to insert it in the AWS redshift with using AWS Glue.I created crawlers to bring source table and target table .They are working fine with .
The code works when I save the records on S3 instead of Redshift .But When using Target as Redshift I get error.
However I keep getting error
"StreamingQueryException: Error while List shards"
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# Script generated for node Kinesis Stream
dataframe_KinesisStream_node1 = glueContext.create_data_frame.from_catalog(
database="dev",
table_name="ventilators_table_kinesis",
additional_options={"startingPosition": "earliest", "inferSchema": "false"},
transformation_ctx="dataframe_KinesisStream_node1",
)
def processBatch(data_frame, batchId):
if data_frame.count() > 0:
KinesisStream_node1 = DynamicFrame.fromDF(
data_frame, glueContext, "from_data_frame"
)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=KinesisStream_node1,
mappings=[
("ventilatorid", "int", "ventilatorid", "int"),
("eventtime", "string", "eventtime", "string"),
("serialnumber", "string", "serialnumber", "string"),
("pressurecontrol", "int", "pressurecontrol", "int"),
("o2stats", "int", "o2stats", "int"),
("minutevolume", "int", "minutevolume", "int"),
("manufacturer", "string", "manufacturer", "string"),
],
transformation_ctx="ApplyMapping_node2",
)
# Script generated for node Redshift Cluster
RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
frame=ApplyMapping_node2,
database="dev",
table_name="dev_projectlightspeed_ventilators_table",
redshift_tmp_dir=args["TempDir"],
transformation_ctx="RedshiftCluster_node3",
)
glueContext.forEachBatch(
frame=dataframe_KinesisStream_node1,
batch_function=processBatch,
options={
"windowSize": "5 seconds",
"checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
},
)
job.commit()
PLease help !!
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
面对此类似的错误,即
streamQueryException:错误时列表shards
在使用S3作为输出的胶水流式作业时。正如错误消息所指出的那样,有关丢失的运动权限(读取碎片)在IAM的角色中,问题的问题是。给予适当的运动权限对我有用。
但是,由于您的工作在S3输出方面效果很好,但会导致Redshift问题,因此检查工作角色是否对Redshift和Kinesis都具有适当的权限。
Faced this similar error saying
StreamingQueryException: Error while List shards
while working with glue streaming job with S3 as output. As the error message indicates, the problem turned out regarding missing Kinesis permissions (to read the shards) in the IAM role of that job.Giving the proper kinesis permissions worked for me.
But since your job works fine with S3 output but causes issue with Redshift, check that the job role has proper permissions for both redshift and kinesis.
我也面临着这个问题,但事实证明这是一个不同的问题。我将AWS胶水连接到另一个帐户中的运动流。正确设置了所有权限以允许跨帐户帐户,我仍然遇到相同的错误:
streamingqueryException:错误时列表shards
。最终,我在胶日志中发现了一些晦涩的消息:胶水是根据消防人的名称和附加UUID的名称创建此
RolesessionName
。这超出了这个限制。为了解决我需要创建一个名称较短的新消防人。I also faced this issue, but it turns out it was a different problem. I am connecting AWS Glue to a Kinesis Stream in another account. All the permissions were setup correctly to allow cross account account and I was still getting this same error:
StreamingQueryException: Error while List shards
. Eventually I found some obscure message in the Glue logs:Glue was creating this
roleSessionName
based on the name of the firehose and appending a UUID. This was exceeding this limit. To solve I needed to create a new Firehose with a shorter name.