Kafka/questDB JDBC 接收器连接器:如果有多个主题,则崩溃
我有一个带有 2 个主题的 Kafka 代理和一个连接到 questDB 数据库的 JDBC 接收器连接器。 一切都是用 docker 容器构建的。
如果我用 1 个主题(无论哪一个)配置 JDBC 连接器,那么它就可以正常工作,并且所有事件都将传输到 questDB。
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @jdbc_sink.json http://connect:8083/connectors
{
"name": "jdbc_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "topic_1",
"table.name.format": "${topic}",
"connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
"connection.user": "admin",
"connection.password": "quest",
"auto.create": "true",
"insert.mode": "insert",
"dialect.name": "PostgreSqlDatabaseDialect"
}
}
一旦我将 2 个主题包含到 JDBC 配置中,questDB 和连接容器就会崩溃。
"topics": "topic_1,topic_2",
连接 Dockerfile
FROM confluentinc/cp-kafka-connect-base:7.1.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.3.3
connect:
build:
context: ./connect
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- broker1
- zookeeper
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker1:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
networks:
- broker-kafka
questdb:
image: questdb/questdb:6.2.2b
hostname: questdb
container_name: questdb
ports:
- "9000:9000" # REST API and Web Console
- "9009:9009" # InfluxDB line protocol
- "8812:8812" # Postgres wire protocol
- "9003:9003" # Min health server
volumes:
- ./questdb:/root/.questdb
networks:
- broker-kafka
这里有什么问题?有人有想法吗?
下面您可以看到 connect 和 questDB 容器的 docker 日志的摘录。我无法发布完整的日志(字符太多),因此我尝试提取相关消息,但坦率地说,我无法找出确定的错误消息。如果您需要更多日志,请告诉我。
questDB日志(摘录)
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f7dc99486a8, pid=1, tid=38
#
# JRE version: OpenJDK Runtime Environment (17.0.1+12) (build 17.0.1+12-39)
# Java VM: OpenJDK 64-Bit Server VM (17.0.1+12-39, mixed mode, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# J 2153 c1 io.questdb.cairo.vm.api.MemoryCR.getStr(JLio/questdb/cairo/vm/api/MemoryCR$CharSequenceView;)Ljava/lang/CharSequence; [email protected] (101 bytes) @ 0x00007f7dc99486a8 [0x00007f7dc99484a0+0x0000000000000208]
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /root/.questdb/hs_err_pid1.log
Compiled method (c1) 386972 2153 3 io.questdb.cairo.vm.api.MemoryCR::getStr (101 bytes)
total in heap [0x00007f7dc9948210,0x00007f7dc9949438] = 4648
relocation [0x00007f7dc9948370,0x00007f7dc9948498] = 296
main code [0x00007f7dc99484a0,0x00007f7dc9948f80] = 2784
stub code [0x00007f7dc9948f80,0x00007f7dc9949060] = 224
oops [0x00007f7dc9949060,0x00007f7dc9949070] = 16
metadata [0x00007f7dc9949070,0x00007f7dc99490b8] = 72
scopes data [0x00007f7dc99490b8,0x00007f7dc9949220] = 360
scopes pcs [0x00007f7dc9949220,0x00007f7dc99493e0] = 448
dependencies [0x00007f7dc99493e0,0x00007f7dc99493e8] = 8
nul chk table [0x00007f7dc99493e8,0x00007f7dc9949438] = 80
#
# If you would like to submit a bug report, please visit:
# https://bugreport.java.com/bugreport/crash.jsp
#
[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]
[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]
连接日志(摘录)
[2022-04-06 07:01:50,706] WARN Unable to query database for maximum table name length; the connector may fail to write to tables with long names (io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect)
org.postgresql.util.PSQLException: ERROR: unknown function name: repeat(STRING,INT)
Position: 15
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:329)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:315)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:291)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:243)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.computeMaxIdentifierLength(PostgreSqlDatabaseDialect.java:119)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:106)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-04-06 07:01:50,709] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
[2022-04-06 07:01:50,745] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:50,998] INFO Using PostgreSql dialect TABLE "topic_1" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,005] INFO Creating table with sql: CREATE TABLE "topic_1" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:51,184] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,193] INFO Using PostgreSql dialect TABLE "topic_1" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,136] INFO Checking PostgreSql dialect for type of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,145] INFO Setting metadata for table "topic_1" to Table{name='"topic_1"', type=TABLE columns=[Column{'liquidation', isPrimaryKey=false, allowsNull=true, sqlType=bool}, Column{'size', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'time', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'price', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'id', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'side', isPrimaryKey=false, allowsNull=true, sqlType=varchar}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2022-04-06 07:01:52,154] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Using PostgreSql dialect TABLE "topic_2" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Creating table with sql: CREATE TABLE "topic_2" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:52,661] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,669] INFO Using PostgreSql dialect TABLE "topic_2" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
I have a Kafka broker with 2 topics and a JDBC sink connector to a questDB database.
Everything is built with docker containers.
If i configure the JDBC connector with 1 topic (which ever) then it works just fine and all events are being transferred over to questDB.
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @jdbc_sink.json http://connect:8083/connectors
{
"name": "jdbc_sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"topics": "topic_1",
"table.name.format": "${topic}",
"connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
"connection.user": "admin",
"connection.password": "quest",
"auto.create": "true",
"insert.mode": "insert",
"dialect.name": "PostgreSqlDatabaseDialect"
}
}
As soon as I include 2 topics into the JDBC config the questDB and connect containers are crashing.
"topics": "topic_1,topic_2",
Connect Dockerfile
FROM confluentinc/cp-kafka-connect-base:7.1.0
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.3.3
connect:
build:
context: ./connect
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- broker1
- zookeeper
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "broker1:29092"
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
networks:
- broker-kafka
questdb:
image: questdb/questdb:6.2.2b
hostname: questdb
container_name: questdb
ports:
- "9000:9000" # REST API and Web Console
- "9009:9009" # InfluxDB line protocol
- "8812:8812" # Postgres wire protocol
- "9003:9003" # Min health server
volumes:
- ./questdb:/root/.questdb
networks:
- broker-kafka
What is the problem here? Does anyone have an idea?
Below you can see extracts from the docker logs of the connect and questDB containers. I cannot post the complete logs (too many characters) so I tried to extract relevant messages, but frankly speaking I couldn't figure out the determining error message. Please let me know if you need more logs.
questDB log (extract)
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x00007f7dc99486a8, pid=1, tid=38
#
# JRE version: OpenJDK Runtime Environment (17.0.1+12) (build 17.0.1+12-39)
# Java VM: OpenJDK 64-Bit Server VM (17.0.1+12-39, mixed mode, tiered, compressed oops, compressed class ptrs, g1 gc, linux-amd64)
# Problematic frame:
# J 2153 c1 io.questdb.cairo.vm.api.MemoryCR.getStr(JLio/questdb/cairo/vm/api/MemoryCR$CharSequenceView;)Ljava/lang/CharSequence; [email protected] (101 bytes) @ 0x00007f7dc99486a8 [0x00007f7dc99484a0+0x0000000000000208]
#
# No core dump will be written. Core dumps have been disabled. To enable core dumping, try "ulimit -c unlimited" before starting Java again
#
# An error report file with more information is saved as:
# /root/.questdb/hs_err_pid1.log
Compiled method (c1) 386972 2153 3 io.questdb.cairo.vm.api.MemoryCR::getStr (101 bytes)
total in heap [0x00007f7dc9948210,0x00007f7dc9949438] = 4648
relocation [0x00007f7dc9948370,0x00007f7dc9948498] = 296
main code [0x00007f7dc99484a0,0x00007f7dc9948f80] = 2784
stub code [0x00007f7dc9948f80,0x00007f7dc9949060] = 224
oops [0x00007f7dc9949060,0x00007f7dc9949070] = 16
metadata [0x00007f7dc9949070,0x00007f7dc99490b8] = 72
scopes data [0x00007f7dc99490b8,0x00007f7dc9949220] = 360
scopes pcs [0x00007f7dc9949220,0x00007f7dc99493e0] = 448
dependencies [0x00007f7dc99493e0,0x00007f7dc99493e8] = 8
nul chk table [0x00007f7dc99493e8,0x00007f7dc9949438] = 80
#
# If you would like to submit a bug report, please visit:
# https://bugreport.java.com/bugreport/crash.jsp
#
[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]
[error occurred during error reporting (), id 0xb, SIGSEGV (0xb) at pc=0x00007f7ddf75b529]
connect log (extract)
[2022-04-06 07:01:50,706] WARN Unable to query database for maximum table name length; the connector may fail to write to tables with long names (io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect)
org.postgresql.util.PSQLException: ERROR: unknown function name: repeat(STRING,INT)
Position: 15
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:490)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:408)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:329)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:315)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:291)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:243)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.computeMaxIdentifierLength(PostgreSqlDatabaseDialect.java:119)
at io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect.getConnection(PostgreSqlDatabaseDialect.java:106)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80)
at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2022-04-06 07:01:50,709] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter)
[2022-04-06 07:01:50,745] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:50,998] INFO Using PostgreSql dialect TABLE "topic_1" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,005] INFO Creating table with sql: CREATE TABLE "topic_1" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:51,184] INFO Checking PostgreSql dialect for existence of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:51,193] INFO Using PostgreSql dialect TABLE "topic_1" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,136] INFO Checking PostgreSql dialect for type of TABLE "topic_1" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,145] INFO Setting metadata for table "topic_1" to Table{name='"topic_1"', type=TABLE columns=[Column{'liquidation', isPrimaryKey=false, allowsNull=true, sqlType=bool}, Column{'size', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'time', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'price', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'id', isPrimaryKey=false, allowsNull=true, sqlType=float4}, Column{'side', isPrimaryKey=false, allowsNull=true, sqlType=varchar}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2022-04-06 07:01:52,154] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Using PostgreSql dialect TABLE "topic_2" absent (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,339] INFO Creating table with sql: CREATE TABLE "topic_2" (
"id" REAL NOT NULL,
"price" REAL NOT NULL,
"size" REAL NOT NULL,
"side" TEXT NOT NULL,
"liquidation" BOOLEAN NOT NULL,
"time" TEXT NOT NULL) (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-06 07:01:52,661] INFO Checking PostgreSql dialect for existence of TABLE "topic_2" (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
[2022-04-06 07:01:52,669] INFO Using PostgreSql dialect TABLE "topic_2" present (io.confluent.connect.jdbc.dialect.GenericDatabaseDialect)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论