有没有办法用 Kafka 消费中的最新消息替换旧消息(避免最终 df 中重复)

发布于 2025-01-11 13:10:01 字数 2463 浏览 0 评论 0原文

我正在使用来自某个主题的数据,正如我们所知,我们实时获取数据,其中我们看到重复的元素,如何实际上用最新消息替换旧消息。

我使用以下相同的代码从主题中使用

schema = StructType(
    [
        StructField("Id",StringType(),True),
        StructField("cTime",StringType(),True),
        StructField("latestTime",StringType(),False),
        StructField("service",StringType(),True),
    ]

topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()

kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))

df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)

sql_features = ["apps.Id",
                "apps.cTime",
                "apps.latesTime", 
                "apps.service"
               ]

kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)

输出如图所示

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:58.601Z    mobile
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.

我们如何用第 2 行覆盖第 1 行,使用键作为 [“id”],其中“latestTime”列,如何仅保留最新时间消息。

有没有实时的方法,如果没有,我们如何至少每小时检查一次,用新的

最终输出替换旧消息

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.
.

I am consuming data from a topic and as we know we get data real time, where we see repeated elements, How can actually replace old message with latest message.

I am using the following same code to consume from a topic

schema = StructType(
    [
        StructField("Id",StringType(),True),
        StructField("cTime",StringType(),True),
        StructField("latestTime",StringType(),False),
        StructField("service",StringType(),True),
    ]

topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()

kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))

df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)

sql_features = ["apps.Id",
                "apps.cTime",
                "apps.latesTime", 
                "apps.service"
               ]

kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)

The output is as shown

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:58.601Z    mobile
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.

How can we overwrite row 1 with row 2, using keys as ["id"] where "latestTime" column, how to keep only the latest time message.

Is there any approach in real time, if not how can we at least check once in a hour to replace the old messages with new

final output

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.
.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

提笔书几行 2025-01-18 13:10:01

Spark 更多地用于批处理/微批处理;它获取记录集合,您可以按时间排序并获取“最新”,或者您可以“按 id 分组”并执行相同操作...

但是,您必须将其与某些数据库/持久存储结合起来才能创建“最新,按 id”的视图。我见过使用 HBase、Couchbase、MongoDB 等完成此操作。如果需要的话,它们都具有一定程度的 Spark 集成。

开箱即用的 Spark,我认为它不容易提供此功能(您可以查看一个 RocksDB State Store)。


另外,Kafka 本身还有 Kafka Streams,它可以提供您正在寻找的内容,尽管它是使用 Java 完成的。

如果您必须使用Python,则可以设置KSQLdb,并且可以从任何其他语言使用其API,并允许您更轻松地将流处理定义为SQL语句。


Kafka Connect 是另一种选择,例如,如果您有一个标准的关系数据库,并且数据中的 ID 是表的主键,那么匹配的键将执行 UPDATE 查询并覆盖现有记录,或插入新的 ID。

Spark is intended more for batch/micro-batch processing; it gets a collection of records which you can order by time and take the "latest", or you can "group by id" and do the same...

But, you will have to combine this with some database / persistent storage to create a view of "most recent, by id". I've seen this done with HBase, Couchbase, MongoDB, etc. which all have some level of Spark integration, if that is a requirement.

Out of the box, with Spark, I don't think it provides this easily (there is a RocksDB State Store you can look at).


Alternatively, and natively to Kafka, there is Kafka Streams which provides exactly what you're looking for, although is done with Java.

If you must use Python, KSQLdb can be setup and its API can be used from any other language and allows you to more easily define stream processing as SQL statements.


Kafka Connect is another alternative, for example, if you have a standard relational database, and the ID in your data is the primary key of a table, then matching keys will perform UPDATE queries and overwrite existing records, or insert on new IDs.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文