我在运行以下pyflink代码时会遇到此错误
这是使用Apache Flink(Pyflink)从KAFKA源中计算每个CH [X]平均值的代码 我想我已经导入了所有必要的库
,并且在运行代码
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
错误时会遇到此错误,这是我添加了SQL Kafka连接器Flink-SQL-Connector-Kafka_2.11-1.1.14.4.4.4.4.4.4.44.
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink)
i think i have imported all of the necessary libraries
And I'm getting this error when running the code
from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *
def create_input():
return """
CREATE TABLE input(
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`t` TIMESTAMP_LTZ(3),
) WITH (
'connector' = 'kafka',
'topic' = 'energymeter.raw',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
)
"""
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
return x/500
def create_output():
return """
CREATE TABLE output (
`gw_id` VARCHAR,
`ch1` BIGINT,
`ch2` BIGINT,
`ch3` BIGINT,
`ch4` BIGINT,
`ch5` BIGINT,
`ch6` BIGINT,
`ch1_mod` BIGINT,
`ch2_mod` BIGINT,
`ch3_mod` BIGINT,
`ch4_mod` BIGINT,
`ch5_mod` BIGINT,
`ch6_mod` BIGINT,
`t` TIMESTAMP_LTZ(3)
) WITH (
'connector' = 'kafka'
'topic' = 'energymeter.processed',
'properties.bootstrap.servers' = '192.168.0.34:9092',
'format' = 'json'
)
"""
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()
Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work
Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
"CONSTRAINT" ...
"PRIMARY" ...
"UNIQUE" ...
"WATERMARK" ...
<BRACKET_QUOTED_IDENTIFIER> ...
<QUOTED_IDENTIFIER> ...
<BACK_QUOTED_IDENTIFIER> ...
<HYPHENATED_IDENTIFIER> ...
<IDENTIFIER> ...
<UNICODE_QUOTED_IDENTIFIER> ...
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
... 13 more ```
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您的程序有很多问题,例如
'Connector'='Kafka'
,在``ttimestamp_ltz(3),
and'之后丢失逗号'='kafka'
'格式'='json',create_temporary_function
注册Python UDFS,而不是execute_sql
输出
定义不一致,我对其进行了一些修改:如下:
There are many problems with your program, e.g.
'connector' = 'kafka'
, extra comma after ``tTIMESTAMP_LTZ(3),
and'format' = 'json',
create_temporary_function
to register Python UDFs instead ofexecute_sql
SELECT
clause is not consistent with the sink tableoutput
definitionI have made some modifications to it as following: