气流:Snowflakeperator:最后查询ID返回XCOM

发布于 2025-02-10 02:56:26 字数 2352 浏览 2 评论 0原文

我有一个带有如下查询的雪花文件,如果我有返回,请在雪花操作员中,以便可以将XCOM传递到下一个任务。

如何仅获取XCOM返回的最后一个查询ID?基本上,我需要将Snowflake的最后查询ID到

SQL文件中的XCOM:

select columns from tableA ;
select last_query_id();

错误:不支持单个API调用中的多个SQL语句;改用每个语句使用一个API调用。

还是有一种方法可以在返回XCOM的查询ID以下获得

代码

  class LastQueryId(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        """Run query on snowflake"""
        self.log.info('Executing: %s', self.sql)
        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
                             warehouse=self.warehouse, database=self.database,
                             role=self.role, schema=self.schema, authenticator=self.authenticator)
        result = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
        self.query_ids = hook.query_ids

        if self.do_xcom_push and len(self.query_ids) > 0:
            return self.query_ids[-1]

更新了:我能够使用上述代码获得雪花的查询ID,但是在日志中,我也看到了查询的结果,我如何避免日志中的那些P>

[2022-06-23, 20:43:39 UTC] {cursor.py:696} INFO - query: [SELECT  modifieddate, documentdate...]
[2022-06-23, 20:43:40 UTC] {cursor.py:720} INFO - query execution done
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 42, 34, 233000), 'DOCUMENTDATE': datetime.datetime(2015, 10, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 13, 50, 45, 377000), 'DOCUMENTDATE': datetime.datetime(2021, 7, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 36, 51, 583000), 'DOCUMENTDATE': datetime.datetime(2015, 8, 31, 0, 0)...}
....
....
....
[2022-06-23, 20:43:56 UTC] {snowflake.py:311} INFO - Rows affected: 22116
[2022-06-23, 20:43:56 UTC] {snowflake.py:312} INFO - Snowflake query id: 01a5259b-0501-98f3-0251-830144baa623
[2022-06-23, 20:43:56 UTC] {connection.py:509} INFO - closed

I have a snowflake file with a query like as below, in the snowflake operator if I have a return so that I can pass xcom to the next task.

How can I get only the last query id to be returned for xcom ? Basically I need to get the snowflake last query id to xcom

In SQL File:

select columns from tableA ;
select last_query_id();

Error : Multiple SQL statements in a single API call are not supported; use one API call per statement instead.

or is there a way I can get below query id returned to xcom
enter image description here

Code:

  class LastQueryId(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        """Run query on snowflake"""
        self.log.info('Executing: %s', self.sql)
        hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
                             warehouse=self.warehouse, database=self.database,
                             role=self.role, schema=self.schema, authenticator=self.authenticator)
        result = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
        self.query_ids = hook.query_ids

        if self.do_xcom_push and len(self.query_ids) > 0:
            return self.query_ids[-1]

UPDATED: I was able to get the query id of the snowflake with above code but in the log, I also see the result of the query, how can I avoid those in the log

[2022-06-23, 20:43:39 UTC] {cursor.py:696} INFO - query: [SELECT  modifieddate, documentdate...]
[2022-06-23, 20:43:40 UTC] {cursor.py:720} INFO - query execution done
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 42, 34, 233000), 'DOCUMENTDATE': datetime.datetime(2015, 10, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 13, 50, 45, 377000), 'DOCUMENTDATE': datetime.datetime(2021, 7, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 36, 51, 583000), 'DOCUMENTDATE': datetime.datetime(2015, 8, 31, 0, 0)...}
....
....
....
[2022-06-23, 20:43:56 UTC] {snowflake.py:311} INFO - Rows affected: 22116
[2022-06-23, 20:43:56 UTC] {snowflake.py:312} INFO - Snowflake query id: 01a5259b-0501-98f3-0251-830144baa623
[2022-06-23, 20:43:56 UTC] {connection.py:509} INFO - closed

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

喜爱纠缠 2025-02-17 02:56:26

SnowflakeOperator already store the query_ids但它不会将它们推到XCOM。

您可以创建一个自定义操作员:

class MySnowflakeOperator(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        """Run query on snowflake"""
        self.log.info('Executing: %s', self.sql)
        hook = self.get_db_hook()
        execution_info = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
        self.query_ids = hook.query_ids

        if self.do_xcom_push and len(self.query_ids) > 0:
            return self.query_ids[-1]  # last query_id

如果要维护原始运算符功能,则可以执行:

class MySnowflakeOperator(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        parent_return_value = super().execute(context)
        if self.do_xcom_push and len(self.query_ids) > 0:
            self.xcom_push(
                context,
                key="last_query_id",
                value=self.query_ids[-1],
            )   
        return parent_return_value

SnowflakeOperator already store the query_ids but it does not push them to xcom.

You can create a custom operator as:

class MySnowflakeOperator(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        """Run query on snowflake"""
        self.log.info('Executing: %s', self.sql)
        hook = self.get_db_hook()
        execution_info = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
        self.query_ids = hook.query_ids

        if self.do_xcom_push and len(self.query_ids) > 0:
            return self.query_ids[-1]  # last query_id

If you want to maintain the original operator functionality then you can do:

class MySnowflakeOperator(SnowflakeOperator):

    def execute(self, context: Any) -> None:
        parent_return_value = super().execute(context)
        if self.do_xcom_push and len(self.query_ids) > 0:
            self.xcom_push(
                context,
                key="last_query_id",
                value=self.query_ids[-1],
            )   
        return parent_return_value
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文