如何避免使用Python ApacheBeam进行数据流时避免使用?
因此,我在Python中使用Apache Beam和Google Cloud从Cloud Storage中获取数据,删除一些列,然后将其移至BigQuery中,直到最后位。
使用writeTobigquery(table = ...)
时,我会收到以下错误:
typeError:isInstance()arg 2必须是一种类型,类型的元组或联合
它来自该isinstance(表,tableReference)
作为parse> parse_table_reference() /代码>功能。奇怪的是,如果我手动检查:
table = TableReference.from_string(...)
print(isinstance(table, TableReference))
然后它会返回,就可以了。
我尝试以各种方式格式化表参考,所有这些都相同。
这是我提供表参考的方式的问题吗?还是有其他方法可以避免此错误?
追溯
TypeError Traceback (most recent call last)
Input In [1], in <cell line: 37>()
38 options = PipelineOptions()
39 p = beam.Pipeline(options = options)
41 (
42 p
43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
44 | "Split" >> beam.ParDo(Split())
45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
47 table = 'other-lender-uploads-test:Lender_Data.Test_Lender',
48 schema = 'Date: STRING, Name: STRING',
49 write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
50 )
52 result = p.run()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
2081 self._dataset = dataset
2082 self._project = project
-> 2083 self.table_reference = bigquery_tools.parse_table_reference(
2084 table, dataset, project)
2085 self.create_disposition = BigQueryDisposition.validate_create(
2086 create_disposition)
2087 self.write_disposition = BigQueryDisposition.validate_write(
2088 write_disposition)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
212 def parse_table_reference(table, dataset=None, project=None):
213 """Parses a table reference into a (project, dataset, table) tuple.
214
215 Args:
(...)
237 format.
238 """
--> 240 if isinstance(table, TableReference):
241 return TableReference(
242 projectId=table.projectId,
243 datasetId=table.datasetId,
244 tableId=table.tableId)
245 elif callable(table):
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
So I've got a very basic pipeline in Python using apache beam and google cloud to take data from Cloud Storage, remove some columns, and move it into BigQuery which works up until that final bit.
When using WriteToBigQuery(table = ...)
, I get the following error:
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
This is coming from it checking isinstance(table, TableReference)
as part of the parse_table_reference()
function. The weird thing is that if I manually check:
table = TableReference.from_string(...)
print(isinstance(table, TableReference))
Then it comes back as True just fine.
I've tried formatting the table reference in various ways and all of them get the same.
Is this an issue with the way that I'm providing the table reference?, or is there another way to do this and avoid this error?
Traceback
TypeError Traceback (most recent call last)
Input In [1], in <cell line: 37>()
38 options = PipelineOptions()
39 p = beam.Pipeline(options = options)
41 (
42 p
43 | "Read" >> beam.io.textio.ReadFromText(('test_lender_2022-04-17.csv'), skip_header_lines = 1)
44 | "Split" >> beam.ParDo(Split())
45 #| "WriteToFile" >> beam.io.textio.WriteToText('testoutput.csv')
---> 46 | "WriteToBQ" >> beam.io.WriteToBigQuery(
47 table = 'other-lender-uploads-test:Lender_Data.Test_Lender',
48 schema = 'Date: STRING, Name: STRING',
49 write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND)
50 )
52 result = p.run()
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery.py:2083, in WriteToBigQuery.__init__(self, table, dataset, project, schema, create_disposition, write_disposition, kms_key, batch_size, max_file_size, max_files_per_bundle, test_client, custom_gcs_temp_location, method, insert_retry_strategy, additional_bq_parameters, table_side_inputs, schema_side_inputs, triggering_frequency, validate, temp_file_format, ignore_insert_ids, with_auto_sharding, ignore_unknown_columns, load_job_project_id)
2081 self._dataset = dataset
2082 self._project = project
-> 2083 self.table_reference = bigquery_tools.parse_table_reference(
2084 table, dataset, project)
2085 self.create_disposition = BigQueryDisposition.validate_create(
2086 create_disposition)
2087 self.write_disposition = BigQueryDisposition.validate_write(
2088 write_disposition)
File ~\AppData\Local\Programs\Python\Python310\lib\site-packages\apache_beam\io\gcp\bigquery_tools.py:240, in parse_table_reference(table, dataset, project)
212 def parse_table_reference(table, dataset=None, project=None):
213 """Parses a table reference into a (project, dataset, table) tuple.
214
215 Args:
(...)
237 format.
238 """
--> 240 if isinstance(table, TableReference):
241 return TableReference(
242 projectId=table.projectId,
243 datasetId=table.datasetId,
244 tableId=table.tableId)
245 elif callable(table):
TypeError: isinstance() arg 2 must be a type, a tuple of types, or a union
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
请安装Apache-Beam [GCP],而不是普通的Apache-Beam。
尝试:
Please install apache-beam[gcp] instead of plain apache-beam.
try:
在我的情况下(macOS),
pip install apache-beam [gcp]
没有找到该软件包,但是(带有报价标记)确实
In my case (macos),
pip install apache-beam[gcp]
didn't find the package, but(with quotation marks) did