匿名呼叫者没有存储。

发布于 2025-02-12 09:14:24 字数 3882 浏览 2 评论 0原文

我有一个简单的Apache Beam程序,该程序从GCP云存储中读取AVRO文件,然后将其写入大查询。


#import print library
import logging
import os
import datetime
#import apache beam library
import apache_beam as beam
from apache_beam import window
from google.cloud import storage

from google.oauth2 import service_account
from google.cloud import language
KEY_PATH= '/home/kiruba/tutorials/gcp-pubsub/bigqueryprojectkey.json'
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = language.LanguageServiceClient(credentials=credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=KEY_PATH

PROJECT='bigqueryproject-XXXX'
BUCKET='XXXX_customdataflow1'

currentdatetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_PATH='pubsub_avro/cloud_storage_transfer/avrofile_'+ currentdatetime+".avro"
# Instantiates a client
client = storage.Client(credentials=credentials)

#import pipeline options.
from apache_beam.options.pipeline_options import  PipelineOptions

#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)

class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    # print(os.path.getsize('/home/kiruba/tutorials/gcp-pubsub/pubsubprojectdir/avrofile_20220624161520.avro'))
    print(element)
    return [len(element)]


#Create a pipeline
p = beam.Pipeline(options=PipelineOptions())
#create a PCollectionfromAVRO file
transaction =(p
     | 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro('gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220703112646.avro')
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bigqueryproject-XXXX:movielens.movietitle_pubsub'))

  
# Run the pipeline
def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

result = p.run()
#  wait until pipeline processing is complete
result.wait_until_finish()

我使用命令执行程序,

python 03_05_classictemplate_pipeline_avro_bigquery.py --project bigqueryproject-XXXX --runner DataflowRunner --staging_location gs://XXXX_customdataflow/staging --temp_location gs://XXXX_customdataflow/staging --region=us-central1

我得到错误



{'gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220624161520.avro': HttpUnauthorizedError('HttpError accessing <https://www.googleapis.com/storage/v1/b/my_bigquery_poc/o/pubsub_avro%2Fcloud_storage_transfer%2Favrofile_20220624161520.avro?alt=json>: response: <{\'x-guploader-uploadid\': \'ADPycds1MWSKBe3m3NRS2Z9YTc1gvjXyoGIXN7HCr8DrWacKDa0w-6-ImqyFubVP4ewIhsp09dCnSbRYi67sZbClEo_CtHJMlSlR\', \'content-type\': \'application/json; charset=UTF-8\', \'date\': \'Sat, 02 Jul 2022 04:21:18 GMT\', \'vary\': \'Origin, X-Origin\', \'www-authenticate\': \'Bearer realm="https://accounts.google.com/"\', \'cache-control\': \'no-cache, no-store, max-age=0, must-revalidate\', \'expires\': \'Mon, 01 Jan 1990 00:00:00 GMT\', \'pragma\': \'no-cache\', \'content-length\': \'432\', \'server\': \'UploadServer\', \'alt-svc\': \'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"\', \'status\': \'401\'}>, content <{\n  "error": {\n    "code": 401,\n    "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n    "errors": [\n      {\n        "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n        "domain": "global",\n        "reason": "required",\n        "locationType": "header",\n        "location": "Authorization"\n      }\n    ]\n  }\n}\n>')}

,为什么我的程序仍将访问视为词语。 如何将有效的服务凭据传递给DataFlow Runner。

I have a simple Apache beam programme which read a avro file from gcp cloud storage and write it to big query.


#import print library
import logging
import os
import datetime
#import apache beam library
import apache_beam as beam
from apache_beam import window
from google.cloud import storage

from google.oauth2 import service_account
from google.cloud import language
KEY_PATH= '/home/kiruba/tutorials/gcp-pubsub/bigqueryprojectkey.json'
credentials = service_account.Credentials.from_service_account_file(KEY_PATH)
client = language.LanguageServiceClient(credentials=credentials)
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=KEY_PATH

PROJECT='bigqueryproject-XXXX'
BUCKET='XXXX_customdataflow1'

currentdatetime = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
BUCKET_PATH='pubsub_avro/cloud_storage_transfer/avrofile_'+ currentdatetime+".avro"
# Instantiates a client
client = storage.Client(credentials=credentials)

#import pipeline options.
from apache_beam.options.pipeline_options import  PipelineOptions

#Set log level to info
root = logging.getLogger()
root.setLevel(logging.INFO)

class ComputeWordLengthFn(beam.DoFn):
  def process(self, element):
    # print(os.path.getsize('/home/kiruba/tutorials/gcp-pubsub/pubsubprojectdir/avrofile_20220624161520.avro'))
    print(element)
    return [len(element)]


#Create a pipeline
p = beam.Pipeline(options=PipelineOptions())
#create a PCollectionfromAVRO file
transaction =(p
     | 'Read all from AVRO' >> beam.io.avroio.ReadFromAvro('gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220703112646.avro')
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery('bigqueryproject-XXXX:movielens.movietitle_pubsub'))

  
# Run the pipeline
def run():
   argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner'
   ]

result = p.run()
#  wait until pipeline processing is complete
result.wait_until_finish()

I execute the programme using the command

python 03_05_classictemplate_pipeline_avro_bigquery.py --project bigqueryproject-XXXX --runner DataflowRunner --staging_location gs://XXXX_customdataflow/staging --temp_location gs://XXXX_customdataflow/staging --region=us-central1

I get the error



{'gs://my_bigquery_poc/pubsub_avro/cloud_storage_transfer/avrofile_20220624161520.avro': HttpUnauthorizedError('HttpError accessing <https://www.googleapis.com/storage/v1/b/my_bigquery_poc/o/pubsub_avro%2Fcloud_storage_transfer%2Favrofile_20220624161520.avro?alt=json>: response: <{\'x-guploader-uploadid\': \'ADPycds1MWSKBe3m3NRS2Z9YTc1gvjXyoGIXN7HCr8DrWacKDa0w-6-ImqyFubVP4ewIhsp09dCnSbRYi67sZbClEo_CtHJMlSlR\', \'content-type\': \'application/json; charset=UTF-8\', \'date\': \'Sat, 02 Jul 2022 04:21:18 GMT\', \'vary\': \'Origin, X-Origin\', \'www-authenticate\': \'Bearer realm="https://accounts.google.com/"\', \'cache-control\': \'no-cache, no-store, max-age=0, must-revalidate\', \'expires\': \'Mon, 01 Jan 1990 00:00:00 GMT\', \'pragma\': \'no-cache\', \'content-length\': \'432\', \'server\': \'UploadServer\', \'alt-svc\': \'h3=":443"; ma=2592000,h3-29=":443"; ma=2592000,h3-Q050=":443"; ma=2592000,h3-Q046=":443"; ma=2592000,h3-Q043=":443"; ma=2592000,quic=":443"; ma=2592000; v="46,43"\', \'status\': \'401\'}>, content <{\n  "error": {\n    "code": 401,\n    "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n    "errors": [\n      {\n        "message": "Anonymous caller does not have storage.objects.get access to the Google Cloud Storage object.",\n        "domain": "global",\n        "reason": "required",\n        "locationType": "header",\n        "location": "Authorization"\n      }\n    ]\n  }\n}\n>')}

Why is my programme still considering the access as Anaonymous.
How can I pass the valid service credential to the dataflow runner.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

肥爪爪 2025-02-19 09:14:24

终于奏效了。

  1. PIP卸载google.cloud.clod.storage#这将是您的lattest版本2.4.0
  2. pip install google.cloud.storage.storage == 1.44.0。

Finally it worked.

  1. pip uninstall google.cloud.storage # this will be yourlatest version 2.4.0
  2. pip install google.cloud.storage==1.44.0.
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文