如何使用气流作曲家将文件从Linux服务器上传到Google Cloud Storage
我正在尝试使用气流将本地文件从Linux服务器传输到Google Cloud Storage,但是当我运行DAG时,该文件将被上传。 代码 from airflow.providers.google.…
有没有办法从气流中的XCOM_PULL中解析信息?
因此,我正在与之合作的是,我有一个DAG,它具有通过任务传递的特定信息,一切都可以正常工作。该文件需要存储到 Reports/文件夹中,以使以下任务正确…
BigQueryInsertJoboperator Dryrrun正在返回成功,而不是在作曲家(气流)上失败
使用 bigQueryInsertJoboperator 并设置配置以在有缺陷的.sql文件/硬编码查询上执行干式运行,即使任务应该失败,该任务也会成功。与Dryrun一起运行为…
无法使用公共IP连接到Cloud Composer的Cloud SQL
我正在尝试将Google Composer的实例与Google Cloud Postgres联系起来。我正在尝试通过公共IP来做到这一点。 DAG错误是说我无法连接到服务器。 DAG使用…
在Google Cloud Composer中配置气流SSH连接
我正在尝试配置从Google Cloud Composer环境上的气流UI到前提Posgresql Server的SSH连接 我应该在哪里存储我的私钥? 如何传递到SSH连接配置专用密钥…
如何在一个GCP项目和一个位置中查询一个BigQuery表,然后将结果写入另一个项目中的表格,而另一个位置则使用气流?
我需要在一个GCP项目(例如#1)和一个位置(EU)中查询一个大Query表,并将结果写入另一个项目(例如#2)和另一个位置(US)的表格(US)。 作曲家/…
使用kubernetespoderator时,如何将Cloud Composer 2 Worker存储限制超过10GB?
我已经配置了Cloud Composer 2 workloads_config { worker { cpu = 2 memory_gb = 6 storage_gb = 10 min_count = 1 max_count = 4 } } 我无法在Terra…
哪个GCP组件用于从API获取数据
我在GCP组件之间有点困惑,这是我的用例: 每天,我需要从外部API(API返回JSON数据)获取数据,将其存储在GCS中,然后将其加载到BigQuery中,我已经…
GCP作曲家v1.18.6和2.0.10与Cloudsqlproxyrunner不相容
在我的 Composer 气流dags,我一直在使用 cloudsqlproxyrunner 连接到我的云SQL实例。 However, after updating Google Cloud Composer from v1.18.4 …
从云作曲家触发gke cronjob工作量
我有一个正在运行的Google Cloud Composer环境。在这个作曲家中,我想协调执行GKE CRONJOB工作负载和Google DataFlow作业。对于DataFlow作业部分,我…
DBT和Google Cloud Composer PYPI依赖性问题
我目前正在使用Composer版本 2.0.9 和AirFlow版本 2.1.4 运行Google Cloud Composer。我正在尝试安装DBT的最新版本( 1.0.4 for Core和 1.0.0 for Big…
pubsubpullsensor需要很长时间来响应一条消息
我正在用动态订阅名称在云作曲家代码中创建订阅。之后,我有一个Pubsubpullsensor操作员。在此任务启动之前,我会用某些属性手动将消息发送到该主题,…
GCP-Cloud Composer:Secret Manager Access Variable.json
我尝试为我的作曲家配置秘密管理器(Ver 1.16,气流1.10),但是我的情况很奇怪。在我的作曲家中,我使用 variable.json 文件来管理气流中的变量 # va…
在airflow中为DAG中的所有任务高效设置task_concurrency
我的要求:我想避免在气流 2.1.4 中同一任务的重叠任务运行。任务的后续运行只能在其前面的 task_run 完成后开始(成功或错误都可以)。我找到了这个…
有没有办法控制气流中的并行任务组?我们可以使用池做到这一点吗?
我正在尝试并行执行不同集的多个类似任务,但只想运行其中一些任务,同时让其他任务组等待完成。例如,如果我有 5 个任务组,我想并行运行其中 3 个任…