如何使用ClickHouse-Go将AVRO数据插入有效的单击室?
背景
优化内存使用
问题
- 如何使用缓冲区将AVRO数据插入Clickhouse?
- 我可以直接从消耗PULSAR的情况下插入AVRO数据,而无需拆除和编组吗?
现在代码
- 我从Pulsar食用消息
msg, err := pulsarConsumer.Receive(ctx)
,然后将MSG发送到频道
dataWriteChan <- msg
- 另一个功能,以从频道接收MSG,然后使用Avro进行Unmarshal
msg <- dataWriteChan
dataPayload := msg.Payload()
var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)
,然后将Avrodata发送到slice slice to Cache
dataCache = append(dataCache, avroData)
- 直到DataCache达到20M
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)
config := goavro.OCFConfig{
W: bf,
Codec: goavroCodec,
}
ocfWriter, _ := goavro.NewOCFWriter(config)
ocfWriter.Append(dataCache)
, 使用缓冲BF生成SQL
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)
EXEC SQL
conn.Exec(ctx, sql)
- 上述步骤可以正常插入AVRO数据,我不想使用SprinRF生成SQL,因为它将变成新的内存。因此,我想使用缓冲区数据并更改为
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())
我不知道这是否可以节省内存;但是更大的问题是它插入错误!
write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)
多个有高度记忆消耗的地方
1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
都感谢您,
无论您是否可以帮助解决它,我也感谢您愿意花时间一起思考!这对我来说非常重要。
Background
Optimize memory usage
Problem
- How to use buffer to insert avro data to clickhouse?
- Can I insert the avro data from consuming pulsar directly without unmarshalling and marshalling ?
Code
- Now I consume message from pulsar
msg, err := pulsarConsumer.Receive(ctx)
and send the msg to a channel
dataWriteChan <- msg
- another function to receive msg from channel and use avro to unmarshal
msg <- dataWriteChan
dataPayload := msg.Payload()
var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)
and then send the avroData to a slice to cache
dataCache = append(dataCache, avroData)
- Until dataCache reaches 20M, program begin to marshal and insert to clickhouse
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)
config := goavro.OCFConfig{
W: bf,
Codec: goavroCodec,
}
ocfWriter, _ := goavro.NewOCFWriter(config)
ocfWriter.Append(dataCache)
then use the buffer bf to generate sql
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)
exec sql
conn.Exec(ctx, sql)
- The above steps can insert avro data normally, I do not want to use Sprinrf to generate sql since it will malloc a new memory. So I want to use the buffer data and change to
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())
I don't know if this can save memory; But the bigger problem is it insert with a error!
write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)
Several places with high memory consumption
1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
Thank you
No matter whether you can help solve it or not, I also appreciated you are willing to spend time thinking together! This is very important to me.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
您似乎正在使用的驱动程序本身不支持AVRO格式。您需要将数据汇入结构。然后,您可以利用驾驶员的附录方法。
The driver itself which you appear to be using does not support avro format. You will need to marshall the data into a struct. You can then exploit the driver's appendStruct method.