我有一个包含时间序列数据的数据库,该数据被发送到 Kafka。
但是我如何构建一个新报告并从第 0 个位置运行一个新消费者,该消费者不存在于 Kafka 中但存在于源存储中。
例如,如果我请求 Kafka 中不存在的偏移量,则生产者会进行一些回调?
I have a database with time series data and this data is sent to Kafka.
Many consumers build aggregations and reporting based on this data.
My Kafka cluster stores data with TTL for 1 day.
But how I can build a new report and run a new consumer from 0th position that does not exist in Kafka but exists in source storage.
For example - some callback for the producer if I request an offset that does not exist in Kafka?
If it is not possible please advise other architectural solutions. I want to use the same codebase to aggregate this data.
timestamp > OffSetsFortimes(MAP<主题分区,long> timestamptosearch) and
方法。If the data does not exist in Kafka, you cannot consume it much less do any aggregation on top of it.
Moreover, there is no concept of a consumer requesting a producer. Producer sends data to Kafka broker(s) and consumers consume from those broker(s). There is no direct interaction between a producer and a consumer as such.
Since you say that the data still exists in the source DB, you can fetch your data from there and reproduce it to Kafka.
When you produce that data again, they will be new messages which will be eventually consumed by the consumers as usual.
In case you would like to differentiate between initial consumption and re-consumption, you can produce these messages to a new topic and have your consumers consume from them.
Other way is to increase your TTL (I suppose you mean retention in Kafka when you say TTL) and then you can seek back to a timestamp in the consumers using the
offsetsForTimes(Map<TopicPartition,Long> timestampToSearch)
andseek(TopicPartition topicPartition, long offset)