如何查找标识符“elasticsearch-7”的任何工厂实现“org.apache.flink.table.factories.DynamicTableFactory”;在类路径中
我正在尝试使用Table API和Elasticsearch作为水槽创建一个Pyflink应用程序。
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")
sink_ddl = """
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
)
"""
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
if __name__ == '__main__':
log_processing()
当我试图运行上述代码时,显示以下错误:
`由:org.apache.flink.table.api.api.validationexception:找不到任何实现org.apache的标识符“ elasticsearch-7”的工厂。 flink.table.factories.dynamictable factory'在类路径中。
可用的工厂标识符是:
- 黑洞
- datagen
- 文件系统
- kafka kafka
print upsert-kafka
at org.apache.flink.table.table.table.factories.factoryutil.discoverfactory(factricutil.java:319)
at org.apache.flink.table.factories.factoryutil.enrichnomatchingconnectorError(factricutil.java:463)\
如何摆脱此问题。
I am trying to create a pyflink application with table API and elasticsearch as sink.
from pyflink.table import TableEnvironment, EnvironmentSettings
def log_processing():
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = TableEnvironment.create(env_settings)
t_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///path_to/flink-sql-connector-kafka_2.12-1.13.1.jar;file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1")
sink_ddl = """
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING,
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'users'
)
"""
t_env.execute_sql(sink_ddl)
print(sink_ddl)
sink_table = t_env.sql_query("SELECT * FROM myUserTable")
if __name__ == '__main__':
log_processing()
When I am trying to run the above code, showing the below error:
`Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
- blackhole
- datagen
- filesystem
- kafka
print upsert-kafka
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)\
How to get rid of this problem.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您能否仔细检查路径
file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1
是否确实存在。似乎缺少.jar
后缀。Could you double check if the path
file:///path_to/flink-sql-connector-elasticsearch7_2.11-1.13.1
really exists. It seems that it's missing the.jar
suffix.