我在运行以下pyflink代码时会遇到此错误

发布于 2025-01-24 21:46:05 字数 3317 浏览 3 评论 0原文

这是使用Apache Flink(Pyflink)从KAFKA源中计算每个CH [X]平均值的代码 我想我已经导入了所有必要的库

,并且在运行代码

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3),
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json',
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka'
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()

错误时会遇到此错误,这是我添加了SQL Kafka连接器Flink-SQL-Connector-Kafka_2.11-1.1.14.4.4.4.4.4.4.44.


Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
        ... 13 more ```

this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink)
i think i have imported all of the necessary libraries

And I'm getting this error when running the code

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3),
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json',
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka'
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()

Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work


Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
        ... 13 more ```

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

青瓷清茶倾城歌 2025-01-31 21:46:05

您的程序有很多问题,例如

  • 'Connector'='Kafka',在``t timestamp_ltz(3), and '之后丢失逗号'='kafka''格式'='json',
  • 应该使用create_temporary_function注册Python UDFS,而不是execute_sql
  • 出现在 select 等级中的字段订单与接收器表输出定义不一致,

我对其进行了一些修改:如下:

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json'
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2

@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500

def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()

There are many problems with your program, e.g.

  • Missing comma after 'connector' = 'kafka', extra comma after ``tTIMESTAMP_LTZ(3), and 'format' = 'json',
  • Should use create_temporary_function to register Python UDFs instead of execute_sql
  • The fields order appearing in the SELECT clause is not consistent with the sink table output definition

I have made some modifications to it as following:

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json'
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2

@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500

def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.create_temporary_function("average_power", average_power)
table_env.create_temporary_function("energy_consumption", energy_consumption)
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文