如何查找标识符“elasticsearch-7”的任何工厂实现“org.apache.flink.table.factories.DynamicTableFactory”;在类路径中

发布于 2025-01-17 22:24:59 字数 1516 浏览 4 评论 0原文

我正在尝试使用Table API和Elasticsearch作为水槽创建一个Pyflink应用程序。

 from pyflink.table import TableEnvironment, EnvironmentSettings

 def log_processing():
    env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    t_env = TableEnvironment.create(env_settings)
    t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")

    sink_ddl = """
           CREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
)
            """
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
    


if __name__ == '__main__':
    log_processing()

当我试图运行上述代码时,显示以下错误:

`由:org.apache.flink.table.api.api.validationexception:找不到任何实现org.apache的标识符“ elasticsearch-7”的工厂。 flink.table.factories.dynamictable factory'在类路径中。

可用的工厂标识符是:

  • 黑洞
  • datagen
  • 文件系统
  • kafka kafka

print upsert-kafka

at org.apache.flink.table.table.table.factories.factoryutil.discoverfactory(factricutil.java:319) at org.apache.flink.table.factories.factoryutil.enrichnomatchingconnectorError(factricutil.java:463)\

如何摆脱此问题。

I am trying to create a pyflink application with table API and elasticsearch as sink.

 from pyflink.table import TableEnvironment, EnvironmentSettings

 def log_processing():
    env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
    t_env = TableEnvironment.create(env_settings)
    t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")

    sink_ddl = """
           CREATE TABLE myUserTable (
  user_id STRING,
  user_name STRING,
  uv BIGINT,
  pv BIGINT,
  PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'users'
)
            """
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
    


if __name__ == '__main__':
    log_processing()

When I am trying to run the above code, showing the below error:

`Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

  • blackhole
  • datagen
  • filesystem
  • kafka

print upsert-kafka

at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)\

How to get rid of this problem.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

甲如呢乙后呢 2025-01-24 22:24:59

您能否仔细检查路径 file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1 是否确实存在。似乎缺少 .jar 后缀。

Could you double check if the path file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1 really exists. It seems that it's missing the .jar suffix.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文