使用NIFI中的PUTSQL获取存储过程的输出

发布于 2025-01-31 06:43:42 字数 2753 浏览 5 评论 0原文

我必须在NIFI中获取Oracle存储过程的输出。 我已经尝试使用以下SQL语句的PUTSQL:

声明varchar2;开始pkg_test.p_test(1,out);结束;

它工作正常,但只是执行脚本。 如何获得输出“输出”的价值? 编辑:我在此处尝试了Groovy脚本:

https://community.cloudera.com/t5/support-questions/does-executesql-processor-plocessor-pallocessor-pallocessor-palle-to-execute-secute-stored-procedure/td-pdd-p/td-p/1589222

我会收到以下错误:

2022-06-17 13:38:53,353 ERROR [Timer-Driven Process Thread-9] oanpgroovyx.ExecuteGroovyScript ExecuteGroovyScript[id=26ab18f1-3b0c-18cf-d90b-3d5904676458] groovy.lang.MissingMethodException: No signature of method :script6a6d0a35 $ _run_closure1.docall()适用于参数类型:(字符串,字符串,java.sql.date,null,null,string,null,null,null,null,string ...)valut ,null,ok,null,null,...]:groovy.lang.missingmethodexception:无签名:script6a6d0a35 $ _run_closure1.docall()适用于参数类型:(字符串,字符串,string,java,java.sql.date,null,null,null,null,null,null,null,null,null,null,null ,字符串,null,null,string ...)值:[xxxx,xxxx,2022-05-30,null,ok,ok,null,null,null,...]

at org.codehaus.groovy.runtime.metaclass.closuremetaclass.invokemethod(closuremetaclass.java:255)

,所以我有过程的输出,但我会收到错误!

脚本:

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import java.sql.ResultSet


////Get the session values from Nifi flow Start 
def flowFile = session.get()
if(!flowFile) return
  String TYPE_NOTIFICATION = flowFile.getAttribute('TYPE_NOTIFICATION')
  String ID_NOTIFICATION = flowFile.getAttribute('ID_NOTIFICATION')
////Get the session values from Nifi flow END
 
 

String sqlString ="""{call PKG_TEST.P_TEST(?,?,?,?,?,?,?,?,?,?,?)}""";

def parametersList = [ID_NOTIFICATION, TYPE_NOTIFICATION,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE ];
 
 
SQL.mydbxx.call(sqlString, parametersList) {out1, out2,...->
    flowFile.putAttribute("out1",out1)...
};

 

session.transfer(flowFile, REL_SUCCESS)

我存储过程的签名: 谢谢你!

I have to get the output of Oracle stored procedure in Nifi.
I've tried PutSql with the following sql statement :

declare out VARCHAR2 ; begin PKG_TEST.P_TEST(1,out); end;

It works fine but it just executes the script.
How can I get the value of output 'out' ?
Edit : I tried the Groovy script here :

https://community.cloudera.com/t5/Support-Questions/Does-ExecuteSQL-processor-allow-to-execute-stored-procedure/td-p/158922

I get the following error :

2022-06-17 13:38:53,353 ERROR [Timer-Driven Process Thread-9] o.a.n.p.groovyx.ExecuteGroovyScript ExecuteGroovyScript[id=26ab18f1-3b0c-18cf-d90b-3d5904676458] groovy.lang.MissingMethodException: No signature of method: Script6a6d0a35$_run_closure1.doCall() is applicable for argument types: (String, String, java.sql.Date, null, String, null, null, String...) values: [xxxx, xxxx, 2022-05-30, null, OK, null, null, ...]: groovy.lang.MissingMethodException: No signature of method: Script6a6d0a35$_run_closure1.doCall() is applicable for argument types: (String, String, java.sql.Date, null, String, null, null, String...) values: [xxxx,xxxx, 2022-05-30, null, OK, null, null, ...]

at org.codehaus.groovy.runtime.metaclass.ClosureMetaClass.invokeMethod(ClosureMetaClass.java:255)

So I have the output of procedure but I get the error !

Script :

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import java.sql.ResultSet


////Get the session values from Nifi flow Start 
def flowFile = session.get()
if(!flowFile) return
  String TYPE_NOTIFICATION = flowFile.getAttribute('TYPE_NOTIFICATION')
  String ID_NOTIFICATION = flowFile.getAttribute('ID_NOTIFICATION')
////Get the session values from Nifi flow END
 
 

String sqlString ="""{call PKG_TEST.P_TEST(?,?,?,?,?,?,?,?,?,?,?)}""";

def parametersList = [ID_NOTIFICATION, TYPE_NOTIFICATION,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE ];
 
 
SQL.mydbxx.call(sqlString, parametersList) {out1, out2,...->
    flowFile.putAttribute("out1",out1)...
};

 

session.transfer(flowFile, REL_SUCCESS)

Signature of my stored procedure :
enter image description here
enter image description here
Thank you!

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

无语# 2025-02-07 06:43:42

我无法测试它 - 因此,它只是参考代码

executeGroovyScript处理器

add sql.mydb处理器级别上的参数并将其链接到必需的DBCP池。

将此大约设置为脚本主体

def ff=session.get()
if(!ff)return

def statement = '''
  declare out VARCHAR2; 
  begin
    PKG_TEST.P_TEST(?, out);
  end;
  ? := out;
'''
//parameters for each ? placeholder
def params = [
  ff.param_input as Long, //get parameter value from flowfile attribute
  SQL.mydb.VARCHAR, //out varchar parameter https://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html#VARCHAR
]
SQL.mydb.call(statement, params){p_out-> //we have only one out patameter
    //closure to process output parameters
    ff.param_output = p_out //assign value into flowfile attribute
}

//transfer flowfile to success
REL_SUCCESS << ff

i can't test it - so, it's just a reference code

use ExecuteGroovyScript processor

add SQL.mydb parameter on the level of processor and link it to required DBCP pool.

set approximately this as a script body

def ff=session.get()
if(!ff)return

def statement = '''
  declare out VARCHAR2; 
  begin
    PKG_TEST.P_TEST(?, out);
  end;
  ? := out;
'''
//parameters for each ? placeholder
def params = [
  ff.param_input as Long, //get parameter value from flowfile attribute
  SQL.mydb.VARCHAR, //out varchar parameter https://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html#VARCHAR
]
SQL.mydb.call(statement, params){p_out-> //we have only one out patameter
    //closure to process output parameters
    ff.param_output = p_out //assign value into flowfile attribute
}

//transfer flowfile to success
REL_SUCCESS << ff
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文