返回介绍

Kafka 实现消息总线

发布于 2024-08-18 11:12:34 字数 7160 浏览 0 评论 0 收藏 0

Spring Cloud Bus 除了支持 RabbitMQ 的自动化配置之外,还支持现在被广泛应用的Kafka。在本节中,我们将搭建一个Kafka的本地环境,并通过它来尝试使用Spring Cloud Bus对Kafka的支持,实现消息总线的功能。

Kafka简介

Kafka是一个由LinkedIn开发的分布式消息系统,它于2011年年初开源,现在由著名的Apache基金会维护与开发。Kafka使用Scala实现,被用作LinkedIn的活动流和运营数据处理的管道,现在也被诸多互联网企业广泛地用作数据流管道和消息系统。

Kafka是基于消息发布-订阅模式实现的消息系统,其主要设计目标如下所述。

- 消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上的数据也能保证常数时间复杂度的访问性能。

- 高吞吐:在廉价的商用机器上也能支持单机每秒10万条以上的吞吐量。

- 分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序。

- 跨平台:支持不同技术平台的客户端(如Java、PHP、Python等)。

- 实时性:支持实时数据处理和离线数据处理。

- 伸缩性:支持水平扩展。

Kafka中涉及的一些基本概念,如下所示。

- Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。

- Topic:逻辑上同RabbitMQ的Queue队列相似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。)

- Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个 Partition,每个 Partition 对应一个文件夹(存储对应分区的消息内容和索引文件)。

- Producer:消息生产者,负责生产消息并发送到Kafka Broker。

- Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。

- Consumer Group:每个Consumer属于一个特定的组(可为每个Consumer指定属于一个组,若不指定则属于默认组),组可以用来实现一条消息被组内多个成员消费等功能。

快速入门

在对Kafka有了一些基本了解之后,下面我们来尝试搭建一个Kafka服务端,并体验一下基于Kafka的消息生产与消费。

环境安装

首先,我们需要从官网上下载安装介质。下载地址为http://kafka.apache.org/downloads.html。本例中采用的版本为Kafka-0.10.0.1。

在解压Kafka的安装包之后,可以看到其目录结构如下所示:

kafka

+-bin

+-windows

+-config

+-libs

+-logs

+-site-docs

由于Kafka的设计中依赖了ZooKeeper,所以我们在bin和config目录中除了看到Kafka 相关的内容之外,还有 ZooKeeper 相关的内容。其中 bin 目录中存放了 Kafka 和ZooKeeper 的命令行工具,bin 根目录下存放的是适用于 Linux/UNIX 的 shell,而bin/windows下存放的则是适用于Windows下的bat。我们可以根据实际的系统来设置环境变量,以方便后续的使用和操作。而config目录,则用来存放关于Kafka与ZooKeeper的配置信息。

启动测试

下面我们来尝试启动ZooKeeper和Kafka来进行消息的生产和消费。示例中所有的命令均以配置了Kafka的环境变量为例。

- 启动ZooKeeper,执行命令zookeeper-server-start config/zookeeper.properties,该命令需要指定 ZooKeeper 的配置文件位置才能正确启动,Kafka的压缩包中包含了其默认配置,开发与测试环境基本不需要修改,所以这里不做详细介绍,对于线上的调优需求,请读者自行查看官方文档进行操作。

[2016-09-28 08:05:34,849]INFO Reading configuration from: config\zookeeper.

properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2016-09-28 08:05:34,850]INFO autopurge.snapRetainCount set to 3(org.apache.

zookeeper.server.DatadirCleanupManager)

[2016-09-28 08:05:34,851]INFO autopurge.purgeInterval set to 0(org.apache.

zookeeper.server.DatadirCleanupManager)

[2016-09-28 08:05:34,851]INFO Purge task is not scheduled.(org.apache.zookeeper.

server.DatadirCleanupManager)

[2016-09-28 08:05:34,852]WARN Either no config or no quorum defined in config,

running in standalone mode(org.apache.zookeeper.server.quorum.QuorumPeerMain)

[2016-09-28 08:05:34,868]INFO Reading configuration from: config\zookeeper.

properties(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

[2016-09-28 08:05:34,869]INFO Starting server(org.apache.zookeeper.server.

ZooKeeperServerMain)

...

[2016-09-28 08:05:34,940]INFO binding to port 0.0.0.0/0.0.0.0:2181(org.apache.

zookeeper.server.NIOServerCnxnFactory)

从控制台信息中我们可以看到,ZooKeeper 从指定的 config/zookeeper.properties 配置文件中读取信息并绑定2181端口启动服务。有时候启动失败,可查看一下端口是否被占用,可以杀掉占用进程或通过修改config/zookeeper.properties配置文件中的clientPort内容以绑定其他端口号来启动ZooKeeper。

- 启动Kafka,执行命令kafka-server-start config/server.properties,该命令也需要指定Kafka配置文件的正确位置,如上命令中指向了解压目录包含的默认配置。若在测试时,使用外部集中环境的ZooKeeper的话,我们可以在该配置文件中通过zookeeper.connect参数来设置ZooKeeper的地址和端口,它默认会连接本地2181端口的ZooKeeper;如果需要设置多个ZooKeeper节点,可以为这个参数配置多个 ZooKeeper 地址,并用逗号分隔。比如 zookeeper.connect=127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002。除此之外,该配置文件中还提供了关于服务端连接、日志等配置参数,具体的线上配置可根据实际情况进行调整。

- 创建Topic,执行命令kafka-topics--create--zookeeper localhost:2181--replication-factor 1--partitions 1--topic test。通过该命令,创建了一个名为test的Topic,该Topic包含一个分区和一个Replica。在创建完成后,可以使用kafka-topics--list--zookeeper localhost:2181命令来查看当前的Topic。

另外,如果不使用 kafka-topics 命令来手工创建,直接使用下面的内容进行消息创建时也会自动创建Topics。

- 创建消息生产者,执行命令 kafka-console-producer--broker-list localhost:9092--topic test。kafka-console-producer 命令可以启动Kafka基于命令行的消息生产客户端,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。我们可以尝试输入几行消息,由于此时并没有消费者,所以这些输入的消息都会被阻塞在名为test的Topics中,直到有消费者将其消费掉。

- 创建消息消费者,执行命令 kafka-console-consumer--zookeeper localhost:2181--topic test--from-beginning。kafka-consoleconsumer命令启动的是Kafka基于命令行的消息消费客户端,启动之后,马上可以在控制台中看到输出了之前我们在消息生产客户端中发送的消息。我们可以再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验Kafka对消息的基础处理。

整合Spring Cloud Bus

在介绍Kafka之前,我们已经通过引入spring-cloud-starter-bus-amqp模块,完成了使用 RabbitMQ 来实现消息总线。若我们要使用 Kafka 来实现消息总线时,只需把spring-cloud-starter-bus-amqp替换成spring-cloud-starter-bus-kafka模块,在pom.xml的dependency节点中进行修改,具体如下:

<dependency>

<groupId>org.springframework.cloud</groupId>

<artifactId>spring-cloud-starter-bus-kafka</artifactId>

</dependency>

如果在启动 Kafka 时均采用了默认配置,那么我们不需要再做任何其他配置就能在本地实现从RabbitMQ到Kafka的切换。可以尝试把刚刚搭建的ZooKeeper、Kafka启动起来,并将修改为spring-cloud-starter-bus-kafka 模块的 config-server 和config-client启动起来。

在config-server启动时,我们可以在控制台中看到如下输出:

从控制台的输出内容我们可以看到,config-server连接到了Kafka中,并使用了名为springCloudBus的Topic。

此时,我们可以使用 kafka-topics--list--zookeeper localhost:2181命令来查看当前Kafka中的Topic。若已成功启动了config-server并配置正确,可以在Kafka中看到已经多了一个名为springCloudBus的Topic。

我们再启动配置了spring-cloud-starter-bus-kafka模块的config-client,可以看到控制台中输出了如下内容:

可以看到,config-client 启动时输出了类似的内容,它们都订阅了名为springCloudBus的Topic。从这里我们也可以知道,在消息总线上的节点,从结构上来说,不论是config-server还是config-client,它们都是对等的。

在启动了config-server和config-client之后,为了更明显地观察消息总线刷新配置的效果,我们可以在本地启动多个不同端口的 config-client。此时,我们的config-server 以及多个 config-client 都已经连接到了由 Kafka 实现的消息总线上。我们可以先访问各个config-client上的/from请求,查看它获取到的配置内容。然后,修改Git中对应的参数内容,再访问各个config-client上的/from请求,可以看到配置内容并没有改变。最后,我们向 config-server 发送 POST 请求:/bus/refresh,此时再去访问各个 config-client 上的/from 请求,就能获得最新的配置信息,各客户端上的配置都已经加载为最新的Git配置内容。

从config-client的控制台中,我们可以看到如下内容:

2016-09-29 08:20:34.361 INFO 21256---[kafka-binder-1]

o.s.cloud.bus.event.RefreshListener   : Received remote refresh request.Keys

refreshed[from]

RefreshListener监听类记录了收到远程刷新请求,并刷新了from属性的日志,在下一节中,我们将根据消息内容与日志输出信息作为线索来探索Spring Cloud Bus的工作机制。

Kafka配置

在上面的例子中,由于Kafka、ZooKeeper均运行于本地,在自动化配置的支持下,我们没有在测试程序中通过配置信息来指定Kafka和ZooKeeper的配置信息,就完成了本地消息总线的试验。但是在实际应用中,Kafka 和 ZooKeeper 一般都会独立部署,所以在应用中需要为Kafka和ZooKeeper配置一些连接信息等。Kafka的整合与RabbitMQ不同,在Spring Boot 1.3.7中并没有直接提供Starter模块,而是采用了Spring Cloud Stream的Kafka模块,所以对于Kafka的配置均采用了spring.cloud.stream.kafka前缀,具体的配置内容我们可以参考第10章的“绑定器配置”一节中关于Kafka配置的内容。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文