下载
编译与部署
开始使用
操作手册
- 数据导入
- 表结构变更
- 物化视图
- HTTP API
- FE
- MANAGER
- Bootstrap Action
- Cancel Load Action
- Check Decommission Action
- Check Storage Type Action
- Config Action
- Connection Action
- Get DDL Statement Action
- Get Load Info Action
- Get Load State
- Get FE log file
- Get Small File
- HA Action
- Hardware Info Action
- Health Action
- Log Action
- Logout Action
- Meta Action
- Meta Action
- Meta Replay State Action
- Profile Action
- Query Detail Action
- Query Profile Action
- Row Count Action
- Session Action
- Set Config Action
- Show Data Action
- Show Meta Info Action
- Show Proc Action
- Show Runtime Info Action
- Statement Execution Action
- System Action
- Table Query Plan Action
- Table Row Count Action
- Table Schema Action
- Upload Action
- CANCEL LABEL
- Compaction Action
- CONNECTION
- getlogfile
- GET LABEL STATE
- GET TABLETS ON A PARTICULAR BE
- PROFILE
- QUERY DETAIL
- RESTORE TABLET
- SHOW DATA
- MIGRATE SINGLE TABLET TO A PARTICULAR DISK
- GET TABLETS DISTRIBUTION BETWEEN DIFFERENT DISKS
- FE
- 运维操作
- 配置文件
- 拦截规则
- 备份与恢复
- Broker
- Colocation Join
- Bucket Shuffle Join
- 动态分区
- 数据导出
- 导出查询结果集
- 分区缓存
- 权限管理
- LDAP
- 资源管理
- 查询执行的统计
- Runtime Filter
- Segment V2 升级手册
- 文件管理器
- SQL MODE
- 时区
- 变量
- 更新
- 多租户和资源划分
最佳实践
扩展功能
- 审计日志插件
- Doris On ES
- Doris output plugin
- ODBC External Table Of Doris
- Doris 插件框架
- Spark Doris Connector
- Flink Doris Connector
- DataX doriswriter
- UDF
设计文档
SQL 手册
- SQL 函数
- 日期函数
- convert_tz
- curdate
- current_timestamp
- curtime,current_time
- date_add
- date_format
- date_sub
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- makedate
- minute
- month
- monthname
- now
- second
- strtodate
- time_round
- timediff
- timestampadd
- timestampdiff
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- year
- yearweek
- 地理位置函数
- 字符串函数
- 聚合函数
- bitmap 函数
- Hash函数
- Doris 窗口函数使用
- CAST
- DIGITAL_MASKING
- 日期函数
- 语法帮助
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CLEAN TRASH
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ADMIN SHOW DATA SKEW
- ALTER CLUSTER
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE CLUSTER
- CREATE FILE
- DROP CLUSTER
- DROP FILE
- ENTER
- INSTALL PLUGIN
- LINK DATABASE
- MIGRATE DATABASE
- SET LDAPADMINPASSWORD
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW MIGRATIONS
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW TRASH
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL ALTER
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE ENCRYPTKEY
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- DROP DATABASE
- DROP ENCRYPTKEY
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- HLL
- RECOVER
- RESTORE
- SHOW ENCRYPTKEYS
- SHOW RESOURCES
- TRUNCATE TABLE
- CREATE FUNCTION
- DROP FUNCTION
- SHOW FUNCTIONS
- DML
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- LOAD
- MINI LOAD
- MULTI LOAD
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SHOW ALTER
- SHOW BACKUP
- SHOW CREATE FUNCTION
- SHOW CREATE ROUTINE LOAD
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD TASK
- SHOW ROUTINE LOAD
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- ALTER ROUTINE LOAD
- INSERT
- UPDATE
- 数据类型
- 辅助命令
开发者手册
- 调试工具
- Doris BE存储层Benchmark工具
- 使用 Eclipse 搭建 FE 开发环境
- 使用 IntelliJ IDEA 搭建 FE 开发环境
- Apache Doris Be 开发调试
- Java 代码格式化
- C++ 代码格式化
Apache 社区
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
Flink Doris Connector
Flink Doris Connector 可以支持通过 Flink 读写 Doris 中存储的数据。
- 可以将
Doris
表映射为DataStream
或者Table
。
版本兼容
Connector | Flink | Doris | Java | Scala |
---|---|---|---|---|
1.0.0 | 1.11.x , 1.12.x | 0.13+ | 8 | 2.12 |
1.0.0 | 1.13.x | 0.13.+ | 8 | 2.12 |
针对Flink 1.13.x版本适配问题
<properties>
<scala.version>2.12</scala.version>
<flink.version>1.11.2</flink.version>
<libthrift.version>0.9.3</libthrift.version>
<arrow.version>0.15.1</arrow.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<doris.home>${basedir}/../../</doris.home>
<doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
</properties>
只需要将这里的 flink.version
改成和你 Flink 集群版本一致,重新编译即可。
编译与安装
在 extension/flink-doris-connector/
源码目录下执行:
注意:
- 这里如果你没有整体编译过 doris 源码,需要首先编译一次 Doris 源码,不然会出现 thrift 命令找不到的情况,需要到
incubator-doris
目录下执行sh build.sh
- 建议在 doris 的 docker 编译环境
apache/incubator-doris:build-env-1.2
下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。
sh build.sh
编译成功后,会在 output/
目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar
。将此文件复制到 Flink
的 ClassPath
中即可使用 Flink-Doris-Connector
。例如,Local
模式运行的 Flink
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的Flink
,则将此文件放入预部署包中。:
备注:
- doris FE 要在配置中配置启用http v2
- Scala版本目前只支持2.12.x版本
conf/fe.conf
enable_http_server_v2 = true
使用示例
此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。 有两种使用sql和java的方法。 以下是示例说明
SQL
此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。
CREATE TABLE flink_doris_source (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
'username' = '$YOUR_DORIS_USERNAME',
'password' = '$YOUR_DORIS_PASSWORD'
);
CREATE TABLE flink_doris_sink (
name STRING,
age INT,
price DECIMAL(5,2),
sale DOUBLE
)
WITH (
'connector' = 'doris',
'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
'username' = '$YOUR_DORIS_USERNAME',
'password' = '$YOUR_DORIS_PASSWORD'
);
INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
DataStream
Properties properties = new Properties();
properties.put("fenodes","FE_IP:8030");
properties.put("username","root");
properties.put("password","");
properties.put("table.identifier","db.table");
env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print();
配置
通用配置项
Key | Default Value | Comment |
---|---|---|
fenodes | -- | Doris FE http 地址 |
table.identifier | -- | Doris 表名,如:db1.tbl1 |
username | -- | 访问Doris的用户名 |
password | -- | 访问Doris的密码 |
doris.request.retries | 3 | 向Doris发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer.MAX_VALUE | 一个Partition对应的Doris Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Flink侧的并行度,但同时会对Doris造成更大的压力。 |
doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少flink与Doris之间建立连接的次数。 从而减轻网络延迟所带来的的额外时间开销。 |
doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到flink-doris-connector迭代所需的RowBatch |
doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
doris.read.field | -- | 读取Doris表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
sink.batch.size | 100 | 单次写BE的最大行数 |
sink.max-retries | 1 | 写BE失败之后的重试次数 |
sink.batch.interval | 1s | flush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。 |
sink.properties.* | -- | Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。 支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true' |
Doris 和 Flink 列类型映射关系
Doris Type | Flink Type |
---|---|
NULL_TYPE | NULL |
BOOLEAN | BOOLEAN |
TINYINT | TINYINT |
SMALLINT | SMALLINT |
INT | INT |
BIGINT | BIGINT |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | STRING |
DATETIME | STRING |
DECIMAL | DECIMAL |
CHAR | STRING |
LARGEINT | STRING |
VARCHAR | STRING |
DECIMALV2 | DECIMAL |
TIME | DOUBLE |
HLL | Unsupported datatype |
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论