如何优化流表订阅入库的性能

发布于 09-12 03:49 字数 3072 浏览 38 评论 0

今天我压测了通过流表订阅的形式去写入DolphinDB database分布式表这块的逻辑,streamTable--data-->partitionTable的TPS只有100多点,请帮忙看看能否优化?现在我能想到的优化点有以下两点:(1)优化udf脚本,(2)有没其他性能更好方案(如:单条插入改批量)。

//表结构:流表t_key_data_stream_zyg:
name             typeString typeInt comment
---------------- ---------- ------- -------
customer_code    SYMBOL     17             
reg_code         SYMBOL     17             
address          SYMBOL     17             
speed            INT        4              
order_num        SYMBOL     17             
item_code        SYMBOL     17             
base_create_time DATETIME   11             

//分区表 t_key_data_zyg
name             typeString typeInt comment
---------------- ---------- ------- -------
customer_code    SYMBOL     17             
reg_code         SYMBOL     17             
address          SYMBOL     17             
speed            INT        4
production       DOUBLE                      
order_num        SYMBOL     17             
item_code        SYMBOL     17             
base_create_time DATETIME   11        

//订阅流数据
subscribeTable(,"t_key_data_stream_zyg","t_key_data_zyg_subscriber",0,udf{loadTable('dfs://simulated_data_zyg','t_key_data_zyg'),t_key_data_stream_zyg},true,,,);

//Handler,参数:partitionTable是分区表,t1是被订阅的流表,t是更新记录
def udf(mutable partitionTable, mutable t1,  t){
    customer_code = exec customer_code from t
    item_code = exec item_code from t
    regCode = exec reg_code from t
    addr = exec address from t
    nextTime = exec base_create_time from t
    nextSpeed = exec speed from t
    nextOrderNum = exec order_num from t
    
    re = select * from partitionTable where reg_code = regCode[0] and address = addr[0] and base_create_time < nextTime[0] order by base_create_time desc limit 1
    if ((exec count(*) from re)==0){
        t3 = select customer_code,reg_code,address,speed,speed*1 as production,order_num,item_code,base_create_time from t
        append!(partitionTable,t3)
        return
    }
    curTime = exec base_create_time from re
    curSpeed = exec speed from re
    curOrderNum = exec order_num from re
    preProd = exec production from re
    t5=select customer_code,reg_code,address,speed,0 as production,order_num,item_code,base_create_time from t
    re.append!(t5)
    production = preProd[0]
    interval = (nextTime[0] - curTime[0]) \ 60

    if (nextOrderNum[0] != curOrderNum[0] && interval <= 3) {
        production = nextSpeed[0] * interval
    }
    if (nextOrderNum[0] == curOrderNum[0] && interval <= 3) {
        production = production + ((nextSpeed[0] + curSpeed[0]) \ 2)*interval
    }
    if (nextOrderNum[0] != curOrderNum[0] && interval > 3) {
        production = nextSpeed[0] * 1
    }
    t2 = table(take(customer_code[0],1) as customer_code,regCode[0] as reg_code,take(addr[0], 1) as address,take(nextSpeed[0],1) as speed, take(production, 1) as production, take(nextOrderNum[0], 1) as order_num, take(item_code[0], 1) as item_code, take(nextTime[0],1) as base_create_time)

    append!(partitionTable,t2)
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

熟人话多2022-09-19 03:49:30

流表的数据预处理程序要提升性能,主要的思路是将处理的数据从一条条改成一批批处理。DolphinDB Script是一个向量化的编程语言,处理批量数据远比一条条处理要高效。建议:
1、将udf改写成支持处理多行数据,当前默认t1只有一行,只处理第一条数据。
2、udf里面re表看代码是需要对历史数据按分组取最后一条数据。

re = select * from partitionTable where reg_code = regCode[0] and address = addr[0] and base_create_time < nextTime[0] order by base_create_time desc limit 1

这个场景可以利用DolphinDB的keyedTable来代替从分布式表取数据,keyedTable可以按分组保留最后一次写入的值。这样可以无需从磁盘取数据,避开io的瓶颈。
3、订阅设置batchSize参数,订阅时会将的数据变成批量传入udf。

subscribeTable(...,true,batchSize,throttle,);
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文