触发云存储 - 数据流
我现在从现在开始,需要一些帮助,我有一个自定义模型,我使用apache beam创建的管道创建了一条管道,该管道从存储桶中的文件夹中获取CSV文件,然后将数据扔到BigQuery表中,BigQuery表已经已经就像我想要的一样的工作,但是由于它是批处理管道,它仅在运行数据流时运行,所以我想自动化此函数,条件是,当加载新文件时,此作业本身运行,我该怎么办?
模板数据流
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = {
'project': 'curso-dataflow-beam-347613' ,
'runner': 'DataflowRunner',
'region': 'southamerica-east1',
'staging_location': 'gs://curso-apachebeam-dataflow/temp',
'temp_location': 'gs://curso-apachebeam-dataflow/temp',
'template_location': 'gs://curso-apachebeam-dataflow/template/storage_to_bigquery',
'save_main_session' : True }
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)
serviceAccount = r'C:\Users\paulo.santos\Documents\CURSO DATA FLOW\Python-master\curso-dataflow-beam-347613-c998cb1e5f49.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount
def criar_dict(record):
dict_ = {}
dict_['name']
dict_['company'] = record[1]
dict_['pin'] = record[2]
return(dict_)
table_schema = 'name:STRING, company:STRING, pin:INTEGER'
tabela = 'projeto_curso_dataflow.Curso_dataflow_projeto'
Tabela_Dados = (
p1
| "Importar Dados" >> beam.io.ReadFromText(r"gs://curso-apachebeam-dataflow/entrada/dataset.csv", skip_header_lines = 2)
| "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
| "Criar um dic" >> beam.Map(lambda record: criar_dict(record))
| "Gravar no BigQuery" >> beam.io.WriteToBigQuery(
tabela,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location = 'gs://curso-apachebeam-dataflow/temp' )
)
p1.run()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
如果您的文件写入很高,则可以使用数据流模板“云存储上的文本文件到bigquery(流)”,
这将创建一个流管线。
在这里,我们有一个例子:
,但是如果您会在水桶上接收文件在非常宽的时间间隔中,基于云功能的方法将更好。
在这里,我们有一个很好的分步:
态storage-using-trigger-30270b31a06d“ rel =” nofollow noreferrer“> https://medium.com/@aishwarya.gupta3/cloupta3/cloud-function-to-to-to-start-a---a--a-data-flow-job-job-job-job-on-a-new--a-new--new-- file-upload-in-google-cloud-sendorage-using-trigger-30270b31a06d
请参阅ya。
If you will have a high throughput of files writes you can use the Dataflow template "Text Files on Cloud Storage to BigQuery (Stream)"
This will create a streaming pipeline.
Here we have an example:
https://cloud.google.com/dataflow/docs/guides/templates/provided-streaming#text-files-on-cloud-storage-to-bigquery-stream
But if you will receive files on your bucket in a very wide time interval, an approach based on cloud functions will be better.
Here we have a good step-by-step:
https://medium.com/@aishwarya.gupta3/cloud-function-to-start-a-data-flow-job-on-a-new-file-upload-in-google-cloud-storage-using-trigger-30270b31a06d
See ya.