如何避免使用Python ApacheBeam进行数据流时避免使用?

发布于 2025-02-03 17:14:25 字数 3122 浏览 6 评论 0原文

因此,我在Python中使用Apache Beam和Google Cloud从Cloud Storage中获取数据,删除一些列,然后将其移至BigQuery中,直到最后位。

使用writeTobigquery(table = ...)时,我会收到以下错误:

typeError:isInstance()arg 2必须是一种类型,类型的元组或联合

它来自该isinstance(表,tableReference)作为parse> parse_table_reference() /代码>功能。奇怪的是,如果我手动检查:

table = TableReference.from_string(...)
print(isinstance(table, TableReference))

然后它会返回,就可以了。

我尝试以各种方式格式化表参考,所有这些都相同。

这是我提供表参考的方式的问题吗?还是有其他方法可以避免此错误?


追溯

    TypeError                                 Traceback (most recent call last)
    Input In [1], in <cell line: 37>()
         38 options = PipelineOptions()
         39 p = beam.Pipeline(options = options)
         41 (
         42 p 
         43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
         44 | "Split" >> beam.ParDo(Split())
         45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
    ---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
         47     table = 'other-lender-uploads-test:Lender_Data.Test_Lender', 
         48     schema = 'Date: STRING, Name: STRING', 
         49     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
         50 )
         52 result = p.run()
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
       2081 self._dataset = dataset
       2082 self._project = project
    -> 2083 self.table_reference = bigquery_tools.parse_table_reference(
       2084     table, dataset, project)
       2085 self.create_disposition = BigQueryDisposition.validate_create(
       2086     create_disposition)
       2087 self.write_disposition = BigQueryDisposition.validate_write(
       2088     write_disposition)
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
        212 def parse_table_reference(table, dataset=None, project=None):
        213   """Parses a table reference into a (project, dataset, table) tuple.
        214 
        215   Args:
       (...)
        237       format.
        238   """
    --> 240   if isinstance(table, TableReference):
        241     return TableReference(
        242         projectId=table.projectId,
        243         datasetId=table.datasetId,
        244         tableId=table.tableId)
        245   elif callable(table):
    
    TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

So I've got a very basic pipeline in Python using apache beam and google cloud to take data from Cloud Storage, remove some columns, and move it into BigQuery which works up until that final bit.

When using WriteToBigQuery(table = ...), I get the following error:

TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

This is coming from it checking isinstance(table, TableReference) as part of the parse_table_reference() function. The weird thing is that if I manually check:

table = TableReference.from_string(...)
print(isinstance(table, TableReference))

Then it comes back as True just fine.

I've tried formatting the table reference in various ways and all of them get the same.

Is this an issue with the way that I'm providing the table reference?, or is there another way to do this and avoid this error?

Traceback

    TypeError                                 Traceback (most recent call last)
    Input In [1], in <cell line: 37>()
         38 options = PipelineOptions()
         39 p = beam.Pipeline(options = options)
         41 (
         42 p 
         43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
         44 | "Split" >> beam.ParDo(Split())
         45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
    ---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
         47     table = 'other-lender-uploads-test:Lender_Data.Test_Lender', 
         48     schema = 'Date: STRING, Name: STRING', 
         49     write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
         50 )
         52 result = p.run()
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
       2081 self._dataset = dataset
       2082 self._project = project
    -> 2083 self.table_reference = bigquery_tools.parse_table_reference(
       2084     table, dataset, project)
       2085 self.create_disposition = BigQueryDisposition.validate_create(
       2086     create_disposition)
       2087 self.write_disposition = BigQueryDisposition.validate_write(
       2088     write_disposition)
    
    File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
        212 def parse_table_reference(table, dataset=None, project=None):
        213   """Parses a table reference into a (project, dataset, table) tuple.
        214 
        215   Args:
       (...)
        237       format.
        238   """
    --> 240   if isinstance(table, TableReference):
        241     return TableReference(
        242         projectId=table.projectId,
        243         datasetId=table.datasetId,
        244         tableId=table.tableId)
        245   elif callable(table):
    
    TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

牵强ㄟ 2025-02-10 17:14:25

请安装Apache-Beam [GCP],而不是普通的Apache-Beam。
尝试:

pip install apache-beam[gcp]

Please install apache-beam[gcp] instead of plain apache-beam.
try:

pip install apache-beam[gcp]
鯉魚旗 2025-02-10 17:14:25

在我的情况下(macOS),pip install apache-beam [gcp]没有找到该软件包,但是

pip install 'apache-beam[gcp]'

(带有报价标记)确实

In my case (macos), pip install apache-beam[gcp] didn't find the package, but

pip install 'apache-beam[gcp]'

(with quotation marks) did

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文