匿名呼叫者没有存储。
我有一个简单的Apache Beam程序,该程序从GCP云存储中读取AVRO文件,然后将其写入大查询。
#import print library
import logging
import os
import datetime
#import apache beam library
import apache_beam as beam
from apache_beam import window
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud import language
KEY_PATH= '/home/kiruba/tutorials/gcp-pubsub/bigqueryprojectkey.json'
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = language.LanguageServiceClient(credentials=credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=KEY_PATH
PROJECT='bigqueryproject-XXXX'
BUCKET='XXXX_customdataflow1'
currentdatetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_PATH='pubsub_avro/cloud_storage_transfer/avrofile_'+ currentdatetime+".avro"
# Instantiates a client
client = storage.Client(credentials=credentials)
#import pipeline options.
from apache_beam.options.pipeline_options import PipelineOptions
#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)
class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
# print(os.path.getsize('/home/kiruba/tutorials/gcp-pubsub/pubsubprojectdir/avrofile_20220624161520.avro'))
print(element)
return [len(element)]
#Create a pipeline
p = beam.Pipeline(options=PipelineOptions())
#create a PCollectionfromAVRO file
transaction =(p
| 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro('gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220703112646.avro')
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bigqueryproject-XXXX:movielens.movietitle_pubsub'))
# Run the pipeline
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
result = p.run()
# wait until pipeline processing is complete
result.wait_until_finish()
我使用命令执行程序,
python 03_05_classictemplate_pipeline_avro_bigquery.py --project bigqueryproject-XXXX --runner DataflowRunner --staging_location gs://XXXX_customdataflow/staging --temp_location gs://XXXX_customdataflow/staging --region=us-central1
我得到错误
{'gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220624161520.avro': HttpUnauthorizedError('HttpError accessing <https://www.googleapis.com/storage/v1/b/my_bigquery_poc/o/pubsub_avro%2Fcloud_storage_transfer%2Favrofile_20220624161520.avro?alt=json>: response: <{\'x-guploader-uploadid\': \'ADPycds1MWSKBe3m3NRS2Z9YTc1gvjXyoGIXN7HCr8DrWacKDa0w-6-ImqyFubVP4ewIhsp09dCnSbRYi67sZbClEo_CtHJMlSlR\', \'content-type\': \'application/json; charset=UTF-8\', \'date\': \'Sat, 02 Jul 2022 04:21:18 GMT\', \'vary\': \'Origin, X-Origin\', \'www-authenticate\': \'Bearer realm="https://accounts.google.com/"\', \'cache-control\': \'no-cache, no-store, max-age=0, must-revalidate\', \'expires\': \'Mon, 01 Jan 1990 00:00:00 GMT\', \'pragma\': \'no-cache\', \'content-length\': \'432\', \'server\': \'UploadServer\', \'alt-svc\': \'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"\', \'status\': \'401\'}>, content <{\n "error": {\n "code": 401,\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "errors": [\n {\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "domain": "global",\n "reason": "required",\n "locationType": "header",\n "location": "Authorization"\n }\n ]\n }\n}\n>')}
,为什么我的程序仍将访问视为词语。 如何将有效的服务凭据传递给DataFlow Runner。
I have a simple Apache beam programme which read a avro file from gcp cloud storage and write it to big query.
#import print library
import logging
import os
import datetime
#import apache beam library
import apache_beam as beam
from apache_beam import window
from google.cloud import storage
from google.oauth2 import service_account
from google.cloud import language
KEY_PATH= '/home/kiruba/tutorials/gcp-pubsub/bigqueryprojectkey.json'
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = language.LanguageServiceClient(credentials=credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=KEY_PATH
PROJECT='bigqueryproject-XXXX'
BUCKET='XXXX_customdataflow1'
currentdatetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_PATH='pubsub_avro/cloud_storage_transfer/avrofile_'+ currentdatetime+".avro"
# Instantiates a client
client = storage.Client(credentials=credentials)
#import pipeline options.
from apache_beam.options.pipeline_options import PipelineOptions
#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)
class ComputeWordLengthFn(beam.DoFn):
def process(self, element):
# print(os.path.getsize('/home/kiruba/tutorials/gcp-pubsub/pubsubprojectdir/avrofile_20220624161520.avro'))
print(element)
return [len(element)]
#Create a pipeline
p = beam.Pipeline(options=PipelineOptions())
#create a PCollectionfromAVRO file
transaction =(p
| 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro('gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220703112646.avro')
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bigqueryproject-XXXX:movielens.movietitle_pubsub'))
# Run the pipeline
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner'
]
result = p.run()
# wait until pipeline processing is complete
result.wait_until_finish()
I execute the programme using the command
python 03_05_classictemplate_pipeline_avro_bigquery.py --project bigqueryproject-XXXX --runner DataflowRunner --staging_location gs://XXXX_customdataflow/staging --temp_location gs://XXXX_customdataflow/staging --region=us-central1
I get the error
{'gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220624161520.avro': HttpUnauthorizedError('HttpError accessing <https://www.googleapis.com/storage/v1/b/my_bigquery_poc/o/pubsub_avro%2Fcloud_storage_transfer%2Favrofile_20220624161520.avro?alt=json>: response: <{\'x-guploader-uploadid\': \'ADPycds1MWSKBe3m3NRS2Z9YTc1gvjXyoGIXN7HCr8DrWacKDa0w-6-ImqyFubVP4ewIhsp09dCnSbRYi67sZbClEo_CtHJMlSlR\', \'content-type\': \'application/json; charset=UTF-8\', \'date\': \'Sat, 02 Jul 2022 04:21:18 GMT\', \'vary\': \'Origin, X-Origin\', \'www-authenticate\': \'Bearer realm="https://accounts.google.com/"\', \'cache-control\': \'no-cache, no-store, max-age=0, must-revalidate\', \'expires\': \'Mon, 01 Jan 1990 00:00:00 GMT\', \'pragma\': \'no-cache\', \'content-length\': \'432\', \'server\': \'UploadServer\', \'alt-svc\': \'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"\', \'status\': \'401\'}>, content <{\n "error": {\n "code": 401,\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "errors": [\n {\n "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n "domain": "global",\n "reason": "required",\n "locationType": "header",\n "location": "Authorization"\n }\n ]\n }\n}\n>')}
Why is my programme still considering the access as Anaonymous.
How can I pass the valid service credential to the dataflow runner.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
终于奏效了。
Finally it worked.