我有一个包含时间序列数据的数据库,该数据被发送到 Kafka。
许多消费者根据这些数据构建聚合和报告。
我的Kafka集群使用TTL存储1天的数据。
但是我如何构建一个新报告并从第 0 个位置运行一个新消费者,该消费者不存在于 Kafka 中但存在于源存储中。
例如,如果我请求 Kafka 中不存在的偏移量,则生产者会进行一些回调?
如果不可能,请告知其他架构解决方案。我想使用相同的代码库来聚合这些数据。
I have a database with time series data and this data is sent to Kafka.
Many consumers build aggregations and reporting based on this data.
My Kafka cluster stores data with TTL for 1 day.
But how I can build a new report and run a new consumer from 0th position that does not exist in Kafka but exists in source storage.
For example - some callback for the producer if I request an offset that does not exist in Kafka?
If it is not possible please advise other architectural solutions. I want to use the same codebase to aggregate this data.
发布评论
评论(1)
如果KAFKA中不存在数据,则您不能消耗更少的时间,以便在其上进行任何汇总。
此外,没有要求生产者的消费者的概念。生产者将数据发送给Kafka经纪人,而消费者则从这些经纪人那里消费。这样的生产者与消费者之间没有直接的互动。
由于您说数据仍然存在于源数据库中,因此您可以从那里获取数据并将其复制到Kafka。
当您再次产生这些数据时,它们将是新消息,最终将像往常一样被消费者消费。
如果您想区分初始消费和重新消费,则可以将这些消息传达给一个新主题,并使您的消费者从中消费。
另一种方法是增加您的ttl(我想您说ttl时的保留),然后您可以使用
timestamp > OffSetsFortimes(MAP<主题分区,long> timestamptosearch) and
seek(主题分区主题分区,长偏移)
方法。If the data does not exist in Kafka, you cannot consume it much less do any aggregation on top of it.
Moreover, there is no concept of a consumer requesting a producer. Producer sends data to Kafka broker(s) and consumers consume from those broker(s). There is no direct interaction between a producer and a consumer as such.
Since you say that the data still exists in the source DB, you can fetch your data from there and reproduce it to Kafka.
When you produce that data again, they will be new messages which will be eventually consumed by the consumers as usual.
In case you would like to differentiate between initial consumption and re-consumption, you can produce these messages to a new topic and have your consumers consume from them.
Other way is to increase your TTL (I suppose you mean retention in Kafka when you say TTL) and then you can seek back to a timestamp in the consumers using the
offsetsForTimes(Map<TopicPartition,Long> timestampToSearch)
andseek(TopicPartition topicPartition, long offset)
methods.