返回介绍

下载

编译与部署

开始使用

操作手册

最佳实践

扩展功能

设计文档

SQL 手册

开发者手册

Apache 社区

Flink Doris Connector

发布于 2021-09-30 01:42:43 字数 11798 浏览 2383 评论 0 收藏 0

Flink Doris Connector 可以支持通过 Flink 读写 Doris 中存储的数据。

  • 可以将Doris表映射为DataStream或者Table

版本兼容

ConnectorFlinkDorisJavaScala
1.0.01.11.x , 1.12.x0.13+82.12
1.0.01.13.x0.13.+82.12

针对Flink 1.13.x版本适配问题

    <properties>
        <scala.version>2.12</scala.version>
        <flink.version>1.11.2</flink.version>
        <libthrift.version>0.9.3</libthrift.version>
        <arrow.version>0.15.1</arrow.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <doris.home>${basedir}/../../</doris.home>
        <doris.thirdparty>${basedir}/../../thirdparty</doris.thirdparty>
    </properties>

只需要将这里的 flink.version 改成和你 Flink 集群版本一致,重新编译即可。

编译与安装

extension/flink-doris-connector/ 源码目录下执行:

注意:

  1. 这里如果你没有整体编译过 doris 源码,需要首先编译一次 Doris 源码,不然会出现 thrift 命令找不到的情况,需要到 incubator-doris 目录下执行 sh build.sh
  2. 建议在 doris 的 docker 编译环境 apache/incubator-doris:build-env-1.2 下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。
sh build.sh

编译成功后,会在 output/ 目录下生成文件 doris-flink-1.0.0-SNAPSHOT.jar。将此文件复制到 FlinkClassPath 中即可使用 Flink-Doris-Connector。例如,Local 模式运行的 Flink,将此文件放入 jars/ 文件夹下。Yarn集群模式运行的Flink,则将此文件放入预部署包中。:

备注:

  1. doris FE 要在配置中配置启用http v2
  2. Scala版本目前只支持2.12.x版本

conf/fe.conf

enable_http_server_v2 = true

使用示例

此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。 有两种使用sql和java的方法。 以下是示例说明

SQL

此步骤的目的是在Flink上注册Doris数据源。 此步骤在Flink上进行。

CREATE TABLE flink_doris_source (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

CREATE TABLE flink_doris_sink (
    name STRING,
    age INT,
    price DECIMAL(5,2),
    sale DOUBLE
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
      'username' = '$YOUR_DORIS_USERNAME',
      'password' = '$YOUR_DORIS_PASSWORD'
);

INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source

DataStream

 Properties properties = new Properties();
 properties.put("fenodes","FE_IP:8030");
 properties.put("username","root");
 properties.put("password","");
 properties.put("table.identifier","db.table");
 env.addSource(new DorisSourceFunction(new DorisStreamOptions(properties),new SimpleListDeserializationSchema())).print();

配置

通用配置项

KeyDefault ValueComment
fenodes--Doris FE http 地址
table.identifier--Doris 表名,如:db1.tbl1
username--访问Doris的用户名
password--访问Doris的密码
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。从而提升Flink侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。增大此数值可减少flink与Doris之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到flink-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效
doris.read.field--读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query--过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。
sink.batch.size100单次写BE的最大行数
sink.max-retries1写BE失败之后的重试次数
sink.batch.interval1sflush 间隔时间,超过该时间后异步线程将 缓存中数据写入BE。 默认值为1秒,支持时间单位ms、s、min、h和d。设置为0表示关闭定期写入。
sink.properties.*--Stream load 的导入参数。例如:'sink.properties.column_separator' = ','等。
支持JSON格式导入,需要同时开启'sink.properties.format' = 'json'和'sink.properties.strip_outer_array' = 'true'

Doris 和 Flink 列类型映射关系

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文