检索XCOMM值并将其传递给EMR操作员,气流
我正在尝试从Python运营商中检索价值,并将其传递给“ EMR创建作业”和“添加步骤操作员”。我如何在spark_steps中传递此值, 我已经使用task_instanc…
如何从Python函数中检索数据并将其用于EMR操作员
气流版本:2.0.2 试图通过从AWS Secrets Manager中重试数据来创建EMR群集。 我正在尝试编写气流DAG,我的任务是从此get_secret函数中获取数据,并在sp…
如何使用MWAA气流DAG克隆EMR群集
我想从EMR侧克隆预先存在的群集 /先前的终止群集。 但是,我想使用MWAA ENV的气流DAG实现这一目标,而不是手动进行。 当前,我能够使用 emraddstepsop…
在Google的Cloud Composer环境中安装和管理不同版本的自定义WHL文件
我从Google Cloud Composer环境开始。我想组织我的项目如下: dags/ ├── project-1 | ├── dist | | ├── my_custom-0.0.1-py3-none-any.whl |…
气流 - 在循环任务之前添加一个任务,该任务取决于先前的任务
我试图使我的任务依赖关系正确。到目前为止,我有8个任务取决于以前的任务,这很棒,但是我想先在这些任务之前运行一个任务,但是我似乎无法正确正确…
气流:Job< Job-ID>在结束之前被杀死(可能是由于记忆不足)
我有一个有两个任务的线性dag-第一个任务的 truthy/falsy 值决定是否执行第二个任务。我正在使用ShortCircuitoter进行第一个任务,以便如果需要,可以…
使用气流SQSSENSOR轮询多个SQS消息
我正在使用此SQSSENSOE设置进行民意调查消息, fetch_sqs_message = SQSSensor( task_id="...", sqs_queue="...", aws_conn_id="aws_default", max_me…
GCP Composer 2(气流2)数据Proc Operator-将软件包传递给PYSPARK_JOB
我正在使用GCP Composer2安排Pyspark(结构化流)作业, Pyspark代码读/写入Kafka。 DAG使用运算符 - dataproccreateclusteroperator (创建一个GKE群…
气流任务手动触发,但仍处于排队状态
我使用的是气流2.3.1,并与本地执行程序一起使用MS SQL Server作为元数据DB。 我试图手动执行DAG,它显示为排队,但什么也没有发生。当触发此DAG时,…
气流没有为我的情况触发sla_miss_callback
我遵循了文档和以前的stackoverflow链接,例如: airflow sla_miss_callback函数a> 对于我的情况,我仍然无法触发 sla_miss_callback 。 我有一个顶级…
在气流中设置多个DAG依赖性
我有多个摄入量表 - > 'dag ingd1','dag ingd2','dag ingd3',因此在哪些数据中摄取数据。 摄入DAG成功完成后,我想运行一个转换dag -> 'da…
我们可以在出口条件下创建气流DAG周期吗?
我们有一个巨大的CSV文件,气流DAG看起来好像 >> read_csv >> apply filter >> store in database 我们必须将数据从CSV读取器操作员传递给过滤器操作…
- 共 1 页
- 1