触发云存储 - 数据流

发布于 2025-01-25 00:50:53 字数 2039 浏览 6 评论 0 原文

我现在从现在开始,需要一些帮助,我有一个自定义模型,我使用apache beam创建的管道创建了一条管道,该管道从存储桶中的文件夹中获取CSV文件,然后将数据扔到BigQuery表中,BigQuery表已经已经就像我想要的一样的工作,但是由于它是批处理管道,它仅在运行数据流时运行,所以我想自动化此函数,条件是,当加载新文件时,此作业本身运行,我该怎么办?

模板数据流

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = {
    'project': 'curso-dataflow-beam-347613' ,
    'runner': 'DataflowRunner',
    'region': 'southamerica-east1',
    'staging_location': 'gs://curso-apachebeam-dataflow/temp',
    'temp_location': 'gs://curso-apachebeam-dataflow/temp',
    'template_location': 'gs://curso-apachebeam-dataflow/template/storage_to_bigquery',
    'save_main_session' : True }

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)

serviceAccount = r'C:\Users\paulo.santos\Documents\CURSO DATA FLOW\Python-master\curso-dataflow-beam-347613-c998cb1e5f49.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount


def criar_dict(record):
    dict_ = {}
    dict_['name'] 
    dict_['company'] = record[1]
    dict_['pin'] = record[2]
    return(dict_)

table_schema = 'name:STRING, company:STRING, pin:INTEGER'
tabela = 'projeto_curso_dataflow.Curso_dataflow_projeto'

Tabela_Dados = (
    p1
    | "Importar Dados" >> beam.io.ReadFromText(r"gs://curso-apachebeam-dataflow/entrada/dataset.csv", skip_header_lines = 2)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Criar um dic" >> beam.Map(lambda record: criar_dict(record)) 
    | "Gravar no BigQuery" >> beam.io.WriteToBigQuery(
                              tabela,
                              schema=table_schema,
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                              custom_gcs_temp_location = 'gs://curso-apachebeam-dataflow/temp' )
)

p1.run()

I'm starting now and I need some help, I have a custom model that I created using apache beam creating a pipeline that takes the data from a csv file from a folder inside a bucket and throws the data inside a bigquery table, which already works like I wanted, but as it is a batch pipeline it only runs when I run the data flow, I wanted to automate this function, with the condition that when a new file is loaded this job runs by itself, how do I do that?

Template dataflow

import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions

pipeline_options = {
    'project': 'curso-dataflow-beam-347613' ,
    'runner': 'DataflowRunner',
    'region': 'southamerica-east1',
    'staging_location': 'gs://curso-apachebeam-dataflow/temp',
    'temp_location': 'gs://curso-apachebeam-dataflow/temp',
    'template_location': 'gs://curso-apachebeam-dataflow/template/storage_to_bigquery',
    'save_main_session' : True }

pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
p1 = beam.Pipeline(options=pipeline_options)

serviceAccount = r'C:\Users\paulo.santos\Documents\CURSO DATA FLOW\Python-master\curso-dataflow-beam-347613-c998cb1e5f49.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]= serviceAccount


def criar_dict(record):
    dict_ = {}
    dict_['name'] 
    dict_['company'] = record[1]
    dict_['pin'] = record[2]
    return(dict_)

table_schema = 'name:STRING, company:STRING, pin:INTEGER'
tabela = 'projeto_curso_dataflow.Curso_dataflow_projeto'

Tabela_Dados = (
    p1
    | "Importar Dados" >> beam.io.ReadFromText(r"gs://curso-apachebeam-dataflow/entrada/dataset.csv", skip_header_lines = 2)
    | "Separar por Vírgulas" >> beam.Map(lambda record: record.split(','))
    | "Criar um dic" >> beam.Map(lambda record: criar_dict(record)) 
    | "Gravar no BigQuery" >> beam.io.WriteToBigQuery(
                              tabela,
                              schema=table_schema,
                              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                              custom_gcs_temp_location = 'gs://curso-apachebeam-dataflow/temp' )
)

p1.run()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

辞别 2025-02-01 00:50:53

如果您的文件写入很高,则可以使用数据流模板“云存储上的文本文件到bigquery(流)”,

这将创建一个流管线。

在这里,我们有一个例子:

,但是如果您会在水桶上接收文件在非常宽的时间间隔中,基于云功能的方法将更好。

在这里,我们有一个很好的分步:
态storage-using-trigger-30270b31a06d“ rel =” nofollow noreferrer“> https://medium.com/@aishwarya.gupta3/cloupta3/cloud-function-to-to-to-start-a---a--a-data-flow-job-job-job-job-on-a-new--a-new--new-- file-upload-in-google-cloud-sendorage-using-trigger-30270b31a06d

请参阅ya。

If you will have a high throughput of files writes you can use the Dataflow template "Text Files on Cloud Storage to BigQuery (Stream)"

This will create a streaming pipeline.

Here we have an example:
https://cloud.google.com/dataflow/docs/guides/templates/provided-streaming#text-files-on-cloud-storage-to-bigquery-stream

But if you will receive files on your bucket in a very wide time interval, an approach based on cloud functions will be better.

Here we have a good step-by-step:
https://medium.com/@aishwarya.gupta3/cloud-function-to-start-a-data-flow-job-on-a-new-file-upload-in-google-cloud-storage-using-trigger-30270b31a06d

See ya.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文