下载
编译与部署
开始使用
操作手册
- 数据导入
- 表结构变更
- 物化视图
- HTTP API
- FE
- MANAGER
- Bootstrap Action
- Cancel Load Action
- Check Decommission Action
- Check Storage Type Action
- Config Action
- Connection Action
- Get DDL Statement Action
- Get Load Info Action
- Get Load State
- Get FE log file
- Get Small File
- HA Action
- Hardware Info Action
- Health Action
- Log Action
- Logout Action
- Meta Action
- Meta Action
- Meta Replay State Action
- Profile Action
- Query Detail Action
- Query Profile Action
- Row Count Action
- Session Action
- Set Config Action
- Show Data Action
- Show Meta Info Action
- Show Proc Action
- Show Runtime Info Action
- Statement Execution Action
- System Action
- Table Query Plan Action
- Table Row Count Action
- Table Schema Action
- Upload Action
- CANCEL LABEL
- Compaction Action
- CONNECTION
- getlogfile
- GET LABEL STATE
- GET TABLETS ON A PARTICULAR BE
- PROFILE
- QUERY DETAIL
- RESTORE TABLET
- SHOW DATA
- MIGRATE SINGLE TABLET TO A PARTICULAR DISK
- GET TABLETS DISTRIBUTION BETWEEN DIFFERENT DISKS
- FE
- 运维操作
- 配置文件
- 拦截规则
- 备份与恢复
- Broker
- Colocation Join
- Bucket Shuffle Join
- 动态分区
- 数据导出
- 导出查询结果集
- 分区缓存
- 权限管理
- LDAP
- 资源管理
- 查询执行的统计
- Runtime Filter
- Segment V2 升级手册
- 文件管理器
- SQL MODE
- 时区
- 变量
- 更新
- 多租户和资源划分
最佳实践
扩展功能
- 审计日志插件
- Doris On ES
- Doris output plugin
- ODBC External Table Of Doris
- Doris 插件框架
- Spark Doris Connector
- Flink Doris Connector
- DataX doriswriter
- UDF
设计文档
SQL 手册
- SQL 函数
- 日期函数
- convert_tz
- curdate
- current_timestamp
- curtime,current_time
- date_add
- date_format
- date_sub
- datediff
- day
- dayname
- dayofmonth
- dayofweek
- dayofyear
- from_days
- from_unixtime
- hour
- makedate
- minute
- month
- monthname
- now
- second
- strtodate
- time_round
- timediff
- timestampadd
- timestampdiff
- to_days
- unix_timestamp
- utc_timestamp
- week
- weekofyear
- year
- yearweek
- 地理位置函数
- 字符串函数
- 聚合函数
- bitmap 函数
- Hash函数
- Doris 窗口函数使用
- CAST
- DIGITAL_MASKING
- 日期函数
- 语法帮助
- 用户账户管理
- 集群管理
- ADMIN CANCEL REPAIR
- ADMIN CLEAN TRASH
- ADMIN CHECK TABLET
- ADMIN REPAIR
- ADMIN SET CONFIG
- ADMIN SET REPLICA STATUS
- ADMIN SHOW CONFIG
- ADMIN SHOW REPLICA DISTRIBUTION
- ADMIN SHOW REPLICA STATUS
- ADMIN SHOW DATA SKEW
- ALTER CLUSTER
- ALTER SYSTEM
- CANCEL DECOMMISSION
- CREATE CLUSTER
- CREATE FILE
- DROP CLUSTER
- DROP FILE
- ENTER
- INSTALL PLUGIN
- LINK DATABASE
- MIGRATE DATABASE
- SET LDAPADMINPASSWORD
- SHOW BACKENDS
- SHOW BROKER
- SHOW FILE
- SHOW FRONTENDS
- SHOW FULL COLUMNS
- SHOW INDEX
- SHOW MIGRATIONS
- SHOW PLUGINS
- SHOW TABLE STATUS
- SHOW TRASH
- UNINSTALL PLUGIN
- DDL
- ALTER DATABASE
- ALTER TABLE
- ALTER VIEW
- BACKUP
- CANCEL ALTER
- CANCEL BACKUP
- CANCEL RESTORE
- CREATE DATABASE
- CREATE ENCRYPTKEY
- CREATE INDEX
- CREATE MATERIALIZED VIEW
- CREATE REPOSITORY
- CREATE RESOURCE
- CREATE TABLE LIKE
- CREATE TABLE
- CREATE VIEW
- DROP DATABASE
- DROP ENCRYPTKEY
- DROP INDEX
- DROP MATERIALIZED VIEW
- DROP REPOSITORY
- DROP RESOURCE
- DROP TABLE
- DROP VIEW
- HLL
- RECOVER
- RESTORE
- SHOW ENCRYPTKEYS
- SHOW RESOURCES
- TRUNCATE TABLE
- CREATE FUNCTION
- DROP FUNCTION
- SHOW FUNCTIONS
- DML
- BROKER LOAD
- CANCEL LOAD
- DELETE
- EXPORT
- GROUP BY
- LOAD
- MINI LOAD
- MULTI LOAD
- PAUSE ROUTINE LOAD
- RESUME ROUTINE LOAD
- ROUTINE LOAD
- SHOW ALTER
- SHOW BACKUP
- SHOW CREATE FUNCTION
- SHOW CREATE ROUTINE LOAD
- SHOW DATA
- SHOW DATABASES
- SHOW DELETE
- SHOW DYNAMIC PARTITION TABLES
- SHOW EXPORT
- SHOW LOAD
- SHOW PARTITIONS
- SHOW PROPERTY
- SHOW REPOSITORIES
- SHOW RESTORE
- SHOW ROUTINE LOAD TASK
- SHOW ROUTINE LOAD
- SHOW SNAPSHOT
- SHOW TABLES
- SHOW TABLET
- SHOW TRANSACTION
- SPARK LOAD
- STOP ROUTINE LOAD
- STREAM LOAD
- ALTER ROUTINE LOAD
- INSERT
- UPDATE
- 数据类型
- 辅助命令
开发者手册
- 调试工具
- Doris BE存储层Benchmark工具
- 使用 Eclipse 搭建 FE 开发环境
- 使用 IntelliJ IDEA 搭建 FE 开发环境
- Apache Doris Be 开发调试
- Java 代码格式化
- C++ 代码格式化
Apache 社区
文章来源于网络收集而来,版权归原创者所有,如有侵权请及时联系!
Spark Doris Connector
Spark Doris Connector 可以支持通过 Spark 读取 Doris 中存储的数据,也支持通过Spark写入数据到Doris。
- 支持从
Doris
中读取数据 - 支持
Spark DataFrame
批量/流式 写入Doris
- 可以将
Doris
表映射为DataFrame
或者RDD
,推荐使用DataFrame
。 - 支持在
Doris
端完成数据过滤,减少数据传输量。
版本兼容
Connector | Spark | Doris | Java | Scala |
---|---|---|---|---|
1.0.0 | 2.x | 0.12+ | 8 | 2.11 |
1.0.0 | 3.x | 0.12.+ | 8 | 2.12 |
编译与安装
在 extension/spark-doris-connector/
源码目录下执行:
注意:
- 这里如果你没有整体编译过 doris 源码,需要首先编译一次 Doris 源码,不然会出现 thrift 命令找不到的情况,需要到
incubator-doris
目录下执行sh build.sh
- 建议在 doris 的 docker 编译环境
apache/incubator-doris:build-env-1.2
下进行编译,因为 1.3 下面的JDK 版本是 11,会存在编译问题。
sh build.sh 3 ## spark 3.x版本,默认是3.1.2
sh build.sh 2 ## soark 2.x版本,默认是2.3.4
编译成功后,会在 output/
目录下生成文件 doris-spark-1.0.0-SNAPSHOT.jar
。将此文件复制到 Spark
的 ClassPath
中即可使用 Spark-Doris-Connector
。例如,Local
模式运行的 Spark
,将此文件放入 jars/
文件夹下。Yarn
集群模式运行的Spark
,则将此文件放入预部署包中。
使用示例
读取
SQL
CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
"table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
"fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"user"="$YOUR_DORIS_USERNAME",
"password"="$YOUR_DORIS_PASSWORD"
);
SELECT * FROM spark_doris;
DataFrame
val dorisSparkDF = spark.read.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.load()
dorisSparkDF.show(5)
RDD
import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
cfg = Some(Map(
"doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
"doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
"doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
))
)
dorisSparkRDD.collect()
写入
DataFrame(batch/stream)
## batch sink
val mockDataDF = List(
(3, "440403001005", "21.cn"),
(1, "4404030013005", "22.cn"),
(33, null, "23.cn")
).toDF("id", "mi_code", "mi_name")
mockDataDF.show(5)
mockDataDF.write.format("doris")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.save()
## stream sink(StructuredStreaming)
val kafkaSource = spark.readStream
.option("kafka.bootstrap.servers", "$YOUR_KAFKA_SERVERS")
.option("startingOffsets", "latest")
.option("subscribe", "$YOUR_KAFKA_TOPICS")
.format("kafka")
.load()
kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
.writeStream
.format("doris")
.option("checkpointLocation", "$YOUR_CHECKPOINT_LOCATION")
.option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
.option("user", "$YOUR_DORIS_USERNAME")
.option("password", "$YOUR_DORIS_PASSWORD")
.start()
.awaitTermination()
配置
通用配置项
Key | Default Value | Comment |
---|---|---|
doris.fenodes | -- | Doris FE http 地址,支持多个地址,使用逗号分隔 |
doris.table.identifier | -- | Doris 表名,如:db1.tbl1 |
doris.request.retries | 3 | 向Doris发送请求的重试次数 |
doris.request.connect.timeout.ms | 30000 | 向Doris发送请求的连接超时时间 |
doris.request.read.timeout.ms | 30000 | 向Doris发送请求的读取超时时间 |
doris.request.query.timeout.s | 3600 | 查询doris的超时时间,默认值为1小时,-1表示无超时限制 |
doris.request.tablet.size | Integer.MAX_VALUE | 一个RDD Partition对应的Doris Tablet个数。 此数值设置越小,则会生成越多的Partition。从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。 |
doris.batch.size | 1024 | 一次从BE读取数据的最大行数。增大此数值可减少Spark与Doris之间建立连接的次数。 从而减轻网络延迟所带来的的额外时间开销。 |
doris.exec.mem.limit | 2147483648 | 单个查询的内存限制。默认为 2GB,单位为字节 |
doris.deserialize.arrow.async | false | 是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch |
doris.deserialize.queue.size | 64 | 异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效 |
SQL 和 Dataframe 专有配置
Key | Default Value | Comment |
---|---|---|
user | -- | 访问Doris的用户名 |
password | -- | 访问Doris的密码 |
doris.filter.query.in.max.count | 100 | 谓词下推中,in表达式value列表元素最大数量。超过此数量,则in表达式条件过滤在Spark侧处理。 |
RDD 专有配置
Key | Default Value | Comment |
---|---|---|
doris.request.auth.user | -- | 访问Doris的用户名 |
doris.request.auth.password | -- | 访问Doris的密码 |
doris.read.field | -- | 读取Doris表的列名列表,多列之间使用逗号分隔 |
doris.filter.query | -- | 过滤读取数据的表达式,此表达式透传给Doris。Doris使用此表达式完成源端数据过滤。 |
Doris 和 Spark 列类型映射关系
Doris Type | Spark Type |
---|---|
NULL_TYPE | DataTypes.NullType |
BOOLEAN | DataTypes.BooleanType |
TINYINT | DataTypes.ByteType |
SMALLINT | DataTypes.ShortType |
INT | DataTypes.IntegerType |
BIGINT | DataTypes.LongType |
FLOAT | DataTypes.FloatType |
DOUBLE | DataTypes.DoubleType |
DATE | DataTypes.StringType1 |
DATETIME | DataTypes.StringType1 |
BINARY | DataTypes.BinaryType |
DECIMAL | DecimalType |
CHAR | DataTypes.StringType |
LARGEINT | DataTypes.StringType |
VARCHAR | DataTypes.StringType |
DECIMALV2 | DecimalType |
TIME | DataTypes.DoubleType |
HLL | Unsupported datatype |
- 注:Connector中,将
DATE
和DATETIME
映射为String
。由于Doris
底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用String
类型直接返回对应的时间可读文本。
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论