如何使用ClickHouse-Go将AVRO数据插入有效的单击室?

发布于 2025-02-12 18:39:42 字数 1822 浏览 4 评论 0原文

背景

优化内存使用

问题

  1. 如何使用缓冲区将AVRO数据插入Clickhouse?
  2. 我可以直接从消耗PULSAR的情况下插入AVRO数据,而无需拆除和编组吗?

现在代码

  1. 我从Pulsar食用消息
msg, err := pulsarConsumer.Receive(ctx)

,然后将MSG发送到频道

dataWriteChan <- msg
  1. 另一个功能,以从频道接收MSG,然后使用Avro进行Unmarshal
msg <- dataWriteChan 
dataPayload := msg.Payload()

var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)

,然后将Avrodata发送到slice slice to Cache

dataCache = append(dataCache, avroData)
  1. 直到DataCache达到20M
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)

config := goavro.OCFConfig{
        W:     bf,
        Codec: goavroCodec,
}

ocfWriter, _ := goavro.NewOCFWriter(config)

ocfWriter.Append(dataCache)

, 使用缓冲BF生成SQL

sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)

EXEC SQL

conn.Exec(ctx, sql)
  1. 上述步骤可以正常插入AVRO数据,我不想使用SprinRF生成SQL,因为它将变成新的内存。因此,我想使用缓冲区数据并更改为
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())

我不知道这是否可以节省内存;但是更大的问题是它插入错误!

 write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)

多个有高度记忆消耗的地方

1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)

都感谢您,

无论您是否可以帮助解决它,我也感谢您愿意花时间一起思考!这对我来说非常重要。

Background

Optimize memory usage

Problem

  1. How to use buffer to insert avro data to clickhouse?
  2. Can I insert the avro data from consuming pulsar directly without unmarshalling and marshalling ?

Code

  1. Now I consume message from pulsar
msg, err := pulsarConsumer.Receive(ctx)

and send the msg to a channel

dataWriteChan <- msg
  1. another function to receive msg from channel and use avro to unmarshal
msg <- dataWriteChan 
dataPayload := msg.Payload()

var avroData interface{}
err := avro.Unmarshal(avroCodec, dataPayload, &avroData)

and then send the avroData to a slice to cache

dataCache = append(dataCache, avroData)
  1. Until dataCache reaches 20M, program begin to marshal and insert to clickhouse
tmpBuf := make([]byte, 0)
bf := bytes.NewBuffer(tmpBuf)

config := goavro.OCFConfig{
        W:     bf,
        Codec: goavroCodec,
}

ocfWriter, _ := goavro.NewOCFWriter(config)

ocfWriter.Append(dataCache)

then use the buffer bf to generate sql

sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro %v", Database, TableName, cols, bf)

exec sql

conn.Exec(ctx, sql)
  1. The above steps can insert avro data normally, I do not want to use Sprinrf to generate sql since it will malloc a new memory. So I want to use the buffer data and change to
sql := fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)
conn.Exec(ctx, sql + "%s", data.String())

I don't know if this can save memory; But the bigger problem is it insert with a error!

 write to storage err: %!(NOVERB)%!(EXTRA string=code: 1001, message: avro::Exception: EOF reached)

Several places with high memory consumption

1. avro.Unmarshal(avroCodec, dataPayload, &avroData)
2. ocfWriter.Append(dataCache)
3. fmt.Sprintf("INSERT INTO %s.%s (%s) FORMAT Avro ", Database, TableName, w.cols)

Thank you

No matter whether you can help solve it or not, I also appreciated you are willing to spend time thinking together! This is very important to me.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

白鸥掠海 2025-02-19 18:39:42

您似乎正在使用的驱动程序本身不支持AVRO格式。您需要将数据汇入结构。然后,您可以利用驾驶员的附录方法。

The driver itself which you appear to be using does not support avro format. You will need to marshall the data into a struct. You can then exploit the driver's appendStruct method.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文