气流:Snowflakeperator:最后查询ID返回XCOM
我有一个带有如下查询的雪花文件,如果我有返回,请在雪花操作员中,以便可以将XCOM传递到下一个任务。
如何仅获取XCOM返回的最后一个查询ID?基本上,我需要将Snowflake的最后查询ID到
SQL文件中的XCOM:
select columns from tableA ;
select last_query_id();
错误:不支持单个API调用中的多个SQL语句;改用每个语句使用一个API调用。
代码:
class LastQueryId(SnowflakeOperator):
def execute(self, context: Any) -> None:
"""Run query on snowflake"""
self.log.info('Executing: %s', self.sql)
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
warehouse=self.warehouse, database=self.database,
role=self.role, schema=self.schema, authenticator=self.authenticator)
result = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
self.query_ids = hook.query_ids
if self.do_xcom_push and len(self.query_ids) > 0:
return self.query_ids[-1]
更新了:我能够使用上述代码获得雪花的查询ID,但是在日志中,我也看到了查询的结果,我如何避免日志中的那些P>
[2022-06-23, 20:43:39 UTC] {cursor.py:696} INFO - query: [SELECT modifieddate, documentdate...]
[2022-06-23, 20:43:40 UTC] {cursor.py:720} INFO - query execution done
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 42, 34, 233000), 'DOCUMENTDATE': datetime.datetime(2015, 10, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 13, 50, 45, 377000), 'DOCUMENTDATE': datetime.datetime(2021, 7, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 36, 51, 583000), 'DOCUMENTDATE': datetime.datetime(2015, 8, 31, 0, 0)...}
....
....
....
[2022-06-23, 20:43:56 UTC] {snowflake.py:311} INFO - Rows affected: 22116
[2022-06-23, 20:43:56 UTC] {snowflake.py:312} INFO - Snowflake query id: 01a5259b-0501-98f3-0251-830144baa623
[2022-06-23, 20:43:56 UTC] {connection.py:509} INFO - closed
I have a snowflake file with a query like as below, in the snowflake operator if I have a return so that I can pass xcom to the next task.
How can I get only the last query id to be returned for xcom ? Basically I need to get the snowflake last query id to xcom
In SQL File:
select columns from tableA ;
select last_query_id();
Error : Multiple SQL statements in a single API call are not supported; use one API call per statement instead.
or is there a way I can get below query id returned to xcom
Code:
class LastQueryId(SnowflakeOperator):
def execute(self, context: Any) -> None:
"""Run query on snowflake"""
self.log.info('Executing: %s', self.sql)
hook = SnowflakeHook(snowflake_conn_id=self.snowflake_conn_id,
warehouse=self.warehouse, database=self.database,
role=self.role, schema=self.schema, authenticator=self.authenticator)
result = hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
self.query_ids = hook.query_ids
if self.do_xcom_push and len(self.query_ids) > 0:
return self.query_ids[-1]
UPDATED: I was able to get the query id of the snowflake with above code but in the log, I also see the result of the query, how can I avoid those in the log
[2022-06-23, 20:43:39 UTC] {cursor.py:696} INFO - query: [SELECT modifieddate, documentdate...]
[2022-06-23, 20:43:40 UTC] {cursor.py:720} INFO - query execution done
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 42, 34, 233000), 'DOCUMENTDATE': datetime.datetime(2015, 10, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 13, 50, 45, 377000), 'DOCUMENTDATE': datetime.datetime(2021, 7, 1, 0, 0)...}
[2022-06-23, 20:43:56 UTC] {snowflake.py:307} INFO - Statement execution info - {'MODIFIEDDATE': datetime.datetime(2022, 6, 23, 11, 36, 51, 583000), 'DOCUMENTDATE': datetime.datetime(2015, 8, 31, 0, 0)...}
....
....
....
[2022-06-23, 20:43:56 UTC] {snowflake.py:311} INFO - Rows affected: 22116
[2022-06-23, 20:43:56 UTC] {snowflake.py:312} INFO - Snowflake query id: 01a5259b-0501-98f3-0251-830144baa623
[2022-06-23, 20:43:56 UTC] {connection.py:509} INFO - closed
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
SnowflakeOperator already store the query_ids但它不会将它们推到XCOM。
您可以创建一个自定义操作员:
如果要维护原始运算符功能,则可以执行:
SnowflakeOperator already store the query_ids but it does not push them to xcom.
You can create a custom operator as:
If you want to maintain the original operator functionality then you can do: