消息队列整理完成
141
Go/5 GoWeb开发/kafka/0 消息队列.md
Normal file
@@ -0,0 +1,141 @@
|
||||
# 消息队列
|
||||
|
||||
|
||||
|
||||
## 消息队列本质
|
||||
|
||||
|
||||
### 定义
|
||||
|
||||
生产者先将消息投递一个叫做「队列」的容器中,然后再从这个容器中取出消息,最后再转发给消费者
|
||||
|
||||

|
||||
|
||||
1. 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
|
||||
2. 队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
|
||||
|
||||
再看今天我们最常用的消息队列产品(RocketMQ、Kafka 等等),你会发现:它们都在最原始的消息模型上做了扩展,同时提出了一些新名词,比如:主题(topic)、分区(partition)、队列(queue)等等。
|
||||
|
||||
|
||||
## 2 消息模型
|
||||
### 队列模型P2P模式
|
||||
|
||||
最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过,队列没有 “读” 这个操作,读就是出队,从队头中 “删除” 这个消息。
|
||||
|
||||

|
||||
|
||||
这便是队列模型:它允许多个生产者往同一个队列发送消息。但是,如果有多个消费者,实际上是竞争的关系,也就是一条消息只能被其中一个消费者接收到,读完即被删除。
|
||||
|
||||
### 发布订阅模型
|
||||
|
||||
|
||||
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。一个可行的方案是:为每个消费者创建一个单独的队列,让生产者发送多份。这种做法比较笨,而且同一份数据会被复制多份,也很浪费空间。为了解决这个问题,就演化出了另外一种消息模型:发布-订阅模型。
|
||||
|
||||

|
||||
|
||||
|
||||
在发布-订阅模型中,存放消息的容器变成了 “主题”,订阅者在接收消息之前需要先 “订阅主题”。最终,每个订阅者都可以收到同一个主题的全量消息。仔细对比下它和 “队列模式” 的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费。
|
||||
|
||||
|
||||
现代主流的 RocketMQ、Kafka 都是直接基于发布-订阅模型实现。
|
||||
|
||||
|
||||
## 3 应用场景
|
||||
|
||||
透过模型看 MQ 的应用场景目前,MQ 的应用场景非常多,大家能倒背如流的是:**系统解耦、异步通信和流量削峰**。除此之外,还有**延迟通知、最终一致性保证、顺序消息、流式处理**等等。
|
||||
|
||||
那到底是先有消息模型,还是先有应用场景呢?答案肯定是:先有应用场景(也就是先有问题),再有消息模型,因为消息模型只是解决方案的抽象而已。
|
||||
|
||||
MQ 经过 30 多年的发展,能从最原始的队列模型发展到今天百花齐放的各种消息中间件(平台级的解决方案),我觉得万变不离其宗,还是得益于:消息模型的适配性很广。我们试着重新理解下消息队列的模型。它其实解决的是:生产者和消费者的通信问题。那它对比 RPC 有什么联系和区别呢?
|
||||
|
||||
|
||||

|
||||
|
||||
通过对比,能很明显地看出两点差异:
|
||||
|
||||
1. 引入 MQ 后,由之前的一次 RPC 变成了现在的两次 RPC,而且生产者只跟队列耦合,它根本无需知道消费者的存在。
|
||||
2. 多了一个中间节点「队列」进行消息转储,相当于将同步变成了异步。
|
||||
|
||||
### 实例
|
||||
|
||||
举一个实际例子,比如说电商业务中最常见的「订单支付」场景:在订单支付成功后,需要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等
|
||||
|
||||
|
||||

|
||||
|
||||
引入 MQ 后,订单支付现在只需要关注它最重要的流程:更新订单状态即可。其他不重要的事情全部交给 MQ 来通知。这便是 MQ 解决的最核心的问题:系统解耦。改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,而且后续业务再扩展(比如:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,降低了维护成本。这个改造还带来了另外一个好处:因为 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤全部变成了异步执行,能减少订单支付的整体耗时,提升订单系统的吞吐量。这便是 MQ 的另一个典型应用场景:异步通信。除此以外,由于队列能转储消息,对于超出系统承载能力的场景,可以用 MQ 作为 “漏斗” 进行限流保护,即所谓的流量削峰。我们还可以利用队列本身的顺序性,来满足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……MQ 其他的应用场景基本类似,都能回归到消息模型的特性上,找到它适用的原因,这里就不一一分析了。
|
||||
|
||||
## 4 设计一个 MQ
|
||||
|
||||
### 需求说明
|
||||
|
||||
文章开头说过,任何 MQ 无外乎:一发一存一消费,这是 MQ 最核心的功能需求。另外,从技术维度来看 MQ 的通信模型,可以理解成:两次 RPC + 消息转储。
|
||||
|
||||
1. 直接利用成熟的 RPC 框架(Dubbo 或者 Thrift),实现两个接口:发消息和读消息。
|
||||
2. 消息放在本地内存中即可,数据结构可以用 JDK 自带的 ArrayBlockingQueue 。
|
||||
|
||||
|
||||
### 难点说明
|
||||
|
||||
最基础的功能:发消息、存消息、消费消息(支持发布-订阅模式)。
|
||||
|
||||
1. 高并发场景下,如何保证收发消息的性能?
|
||||
2. 如何保证消息服务的高可用和高可靠?
|
||||
3. 如何保证服务是可以水平任意扩展的?
|
||||
4. 如何保证消息存储也是水平可扩展的?
|
||||
5. 各种元数据(比如集群中的各个节点、主题、消费关系等)如何管理,需不需要考虑数据的一致性?
|
||||
|
||||
|
||||
### 整体设计
|
||||
|
||||
整体架构,会涉及三类角色:
|
||||
|
||||

|
||||
|
||||
核心流程进一步细化后,比较完整的数据流如下:
|
||||
|
||||

|
||||
|
||||
1. Broker(服务端):MQ 中最核心的部分,是 MQ 的服务端,核心逻辑几乎全在这里,它为生产者和消费者提供 RPC 接口,负责消息的存储、备份和删除,以及消费关系的维护等。
|
||||
2. Producer(生产者):MQ 的客户端之一,调用 Broker 提供的 RPC 接口发送消息。
|
||||
3. Consumer(消费者):MQ 的另外一个客户端,调用 Broker 提供的 RPC 接口接收消息,同时完成消费确认。
|
||||
|
||||
|
||||
### 详细设计
|
||||
|
||||
|
||||
**难点1:RPC 通信**
|
||||
|
||||
解决的是 Broker 与 Producer 以及 Consumer 之间的通信问题。如果不重复造轮子,直接利用成熟的 RPC 框架 Dubbo 或者 Thrift 实现即可,这样不需要考虑服务注册与发现、负载均衡、通信协议、序列化方式等一系列问题了。
|
||||
|
||||
当然,你也可以基于 Netty 来做底层通信,用 Zookeeper、Euraka 等来做注册中心,然后自定义一套新的通信协议(类似 Kafka),也可以基于 AMQP 这种标准化的 MQ 协议来做实现(类似 RabbitMQ)。对比直接用 RPC 框架,这种方案的定制化能力和优化空间更大。
|
||||
|
||||
**难点2:高可用设计**
|
||||
|
||||
高可用主要涉及两方面:Broker 服务的高可用、存储方案的高可用。可以拆开讨论。
|
||||
|
||||
Broker 服务的高可用,只需要保证 Broker 可水平扩展进行集群部署即可,进一步通过服务自动注册与发现、负载均衡、超时重试机制、发送和消费消息时的 ack 机制来保证。
|
||||
|
||||
存储方案的高可用有两个思路:
|
||||
|
||||
1)参考 Kafka 的分区 + 多副本模式,但是需要考虑分布式场景下数据复制和一致性方案(类似 Zab、Raft等协议),并实现自动故障转移;
|
||||
|
||||
2)还可以用主流的 DB、分布式文件系统、带持久化能力的 KV 系统,它们都有自己的高可用方案。
|
||||
|
||||
|
||||
**难点3:存储设计**
|
||||
|
||||
消息的存储方案是 MQ 的核心部分,可靠性保证已经在高可用设计中谈过了,可靠性要求不高的话直接用内存或者分布式缓存也可以。这里重点说一下存储的高性能如何保证?这个问题的决定因素在于存储结构的设计。
|
||||
|
||||
目前主流的方案是:追加写日志文件(数据部分) + 索引文件的方式(很多主流的开源 MQ 都是这种方式),索引设计上可以考虑稠密索引或者稀疏索引,查找消息可以利用跳转表、二份查找等,还可以通过操作系统的页缓存、零拷贝等技术来提升磁盘文件的读写性能。如果不追求很高的性能,也可以考虑现成的分布式文件系统、KV 存储或者数据库方案。
|
||||
|
||||
|
||||
**难点4:消费关系管理**
|
||||
|
||||
为了支持发布-订阅的广播模式,Broker 需要知道每个主题都有哪些 Consumer 订阅了,基于这个关系进行消息投递。
|
||||
|
||||
由于 Broker 是集群部署的,所以消费关系通常维护在公共存储上,可以基于 Zookeeper、Apollo 等配置中心来管理以及进行变更通知。
|
||||
|
||||
**难点5:高性能设计**
|
||||
|
||||
存储的高性能前面已经谈过了,当然还可以从其他方面进一步优化性能。比如 Reactor 网络 IO 模型、业务线程池的设计、生产端的批量发送、Broker 端的异步刷盘、消费端的批量拉取等等
|
||||
213
Go/5 GoWeb开发/kafka/1 kafka入门.md
Normal file
@@ -0,0 +1,213 @@
|
||||
|
||||
> 参考文献
|
||||
> * [原文链接:《超详细“零”基础kafka入门篇》](https://www.cnblogs.com/along21/p/10278100.html)
|
||||
|
||||
|
||||
|
||||
## 1 认识 kafka
|
||||
|
||||
**Kafka 是一个分布式流媒体平台**
|
||||
|
||||
|
||||
### 流媒体平台有三个关键功能:
|
||||
|
||||
* **发布和订阅记录流**,类似于消息队列或企业消息传递系统。
|
||||
* 以容错的**持久方式存储记录流**。
|
||||
* **记录发生时处理流**。
|
||||
|
||||
|
||||
### Kafka 通常用于两大类应用:
|
||||
|
||||
* 构建可在系统或应用程序之间可靠获取数据的**实时流数据管道**
|
||||
* 构建转换或响应数据流的**实时流应用程序**
|
||||
|
||||
|
||||
|
||||
### 几个概念:
|
||||
|
||||
* Kafka 作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
|
||||
* Kafka 集群以称为 `topics主题` 的类别存储记录流。
|
||||
* 每条记录都包含一个 **`键`**,一个 **`值`** 和一个 **`时间戳`**。
|
||||
|
||||
|
||||
### Kafka 有四个核心 API:
|
||||
|
||||
* **`Producer API(生产者API)`**: 允许应用程序发布记录流至一个或多个 kafka 的 `topics(主题)`。
|
||||
* **`Consumer API(消费者API)`**: 允许应用程序订阅一个或多个 `topics(主题)`,并处理所产生的对他们记录的数据流。
|
||||
* **`Streams API(流API)`**: 允许应用程序充当流处理器,从一个或多个 `topics(主题)`消耗的输入流,并产生一个输出流至一个或多个输出的 `topics(主题)`,有效地变换所述输入流,以输出流。
|
||||
* **`Connector API(连接器API)`**: 允许构建和运行 kafka `topics(主题)` 连接到现有的应用程序或数据系统中重用生产者或消费者。例如,关系数据库的连接器可能捕获对表的每个更改。
|
||||
|
||||

|
||||
|
||||
在 Kafka 中,客户端和服务器之间的通信是通过简单,高性能,语言无关的 **TCP 协议** 完成的。此协议已版本化并保持与旧版本的向后兼容性。Kafka 提供 Java 客户端,但客户端有多种语言版本。
|
||||
|
||||
## 2 Topics 主题 和 partitions 分区
|
||||
|
||||
我们首先深入了解 Kafka 为记录流提供的核心抽象 - `主题topics`
|
||||
|
||||
* 一个 `Topic` 可以认为是一类消息,每个 `topic` 将被分成多个 `partition(区)`,每个 `partition` 在存储层面是 `append log` 文件
|
||||
|
||||
* **topic 主题是发布记录的类别或订阅源名称**。Kafka 的主题总是多用户; 也就是说,一个主题可以有零个,一个或多个消费者订阅写入它的数据。
|
||||
|
||||
对于每个主题,Kafka 群集都维护一个如下所示的 `分区` 日志:
|
||||
|
||||

|
||||
|
||||
|
||||
**每个分区都是一个有序的**,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都分配了一个称为**偏移**的**顺序ID号**,**它唯一地标识分区中的每个记录**。
|
||||
|
||||
**Kafka 集群持久保存所有已发布的记录,直到保留期结束才会丢弃记录**。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。
|
||||
|
||||

|
||||
|
||||
实际上,**基于每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置**。这种偏移由消费者控制:通常消费者在读取记录时会线性地提高其偏移量,但事实上,由于该位置由消费者控制,因此它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的偏移量来重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。
|
||||
|
||||
这些功能组合意味着 Kafka `消费者consumers` 非常 cheap - 他们可以来来往往对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具 “tail” 任何主题的内容,而无需更改任何现有使用者所消耗的内容。
|
||||
|
||||
日志中的分区有多种用途。首先,它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们充当了并行性的单位 - 更多的是它。
|
||||
|
||||
## 3 Distribution 分配
|
||||
|
||||
一个 `Topic` 的多个 `partitions`, 被分布在 kafka 集群中的多个 `server` 上; 每个`server( kafka 实例)` 负责 `partitions` 中消息的读写操作; 此外 kafka 还可以配置 `partitions` 需要备份的个数 (`replicas`),每个 `partition` 将会被备份到多台机器上,以提高可用性.
|
||||
|
||||
基于 `replicated` 方案,那么就意味着需要对多个备份进行调度;
|
||||
|
||||
每个 `partition` 都有一个 `server` 为 `leader`; `leader` 负责所有的读写操作,如果`leader` 失效,那么将会有其他 `follower` 来接管(成为新的 `leader` );
|
||||
|
||||
`follower`只是单调的和 `leader` 跟进,同步消息即可.
|
||||
|
||||
由此可见作为 `leader` 的 `server` 承载了全部的请求压力,因此从集群的整体考虑,有多少个 `partitions` 就意味着有多少个 `leader` ,kafka 会将 `leader` 均衡的分散在每个实例上,来确保整体的性能稳定。
|
||||
|
||||
|
||||
## 4 Producers 生产者和 Consumers 消费者
|
||||
|
||||
### Producers 生产者
|
||||
|
||||
`Producers` 将数据发布到指定的 `topics 主题`。同时 `Producer` 也能决定将此消息归属于哪个`partition`; 比如基于 `round-robin` 方式或者通过其他的一些算法等。
|
||||
|
||||
|
||||
### Consumer消费者
|
||||
|
||||
* 本质上 kafka 只支持 `Topic` .每个 `consumer` 属于一个 `consumer group` ;反过来说,每个`group` 中可以有多个 `consumer` .发送到 `Topic` 的消息,只会被订阅此 `Topic` 的每个 `group` 中的一个 `consumer`消费。
|
||||
|
||||
* 如果所有使用者实例具有**相同的使用者组**,则记录将有效地在使用者实例上进行**负载平衡**。
|
||||
|
||||
* 如果所有消费者实例具有**不同的消费者组**,则每个记录将**广播**到所有消费者进程。
|
||||
|
||||

|
||||
|
||||
|
||||
上图中,有两个服务器 Kafka 集群,托管四个分区(P0-P3),包含两个使用者组。消费者组 A 有两个消费者实例,B 组有四个消费者实例。
|
||||
|
||||
|
||||
在 Kafka 中实现消费 `consumption` 的方式是通过**在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分配的“公平份额”的独占消费者**。维护组中成员资格的过程由 Kafka 协议动态处理。如果新实例加入该组,他们将从该组的其他成员接管一些分区; 如果实例死亡,其分区将分发给其余实例。
|
||||
|
||||
**Kafka 仅提供分区内记录的总订单,而不是主题中不同分区之间的记录**。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果您需要对记录进行总订单,则可以使用仅包含一个分区的主题来实现,但这将意味着每个使用者组只有一个使用者进程。
|
||||
|
||||
## 5 Consumers kafka 确保
|
||||
|
||||
* 发送到 `partitions` 中的消息将会按照它接收的顺序追加到日志中。也就是说,如果记录 M1 由与记录 M2 相同的生成者发送,并且首先发送 M1 ,则 M1 将具有比 M2 更低的偏移并且在日志中更早出现。
|
||||
|
||||
* 消费者实例按照它们存储在日志中的顺序查看记录。对于消费者而言,它们消费消息的顺序和日志中消息顺序一致。
|
||||
|
||||
* 如果 `Topic` 的 `replicationfactor` 为 N ,那么允许 N-1 个 kafka 实例失效,我们将容忍最多 N-1 个服务器故障,而不会丢失任何提交到日志的记录。
|
||||
|
||||
|
||||
## 6 kafka 作为消息系统
|
||||
|
||||
Kafka 的流概念与传统的企业邮件系统相比如何?
|
||||
|
||||
### 与传统消息系统对比
|
||||
|
||||
消息传统上有两种模型:`queuing`(队列) 和`publish-subscribe`(发布 - 订阅)。
|
||||
|
||||
* 在 `队列` 中,消费者池可以从服务器读取并且每个记录转到其中一个;
|
||||
* `在发布 - 订阅`中,记录被广播给所有消费者。
|
||||
|
||||
这两种模型中的每一种都有优点和缺点。`队列` 的优势在于它允许您在多个消费者实例上划分数据处理,从而可以扩展您的处理。不幸的是,一旦一个进程读取它已经消失的数据,队列就不是多用户。`发布 - 订阅` 允许您将数据广播到多个进程,但由于每条消息都发送给每个订阅者,因此无法进行扩展处理。
|
||||
|
||||
kafka 的消费者群体概念概括了这两个概念。与队列一样,使用者组允许您将处理划分为一组进程(使用者组的成员)。与 `发布 - 订阅` 一样,Kafka 允许您向多个消费者组广播消息。
|
||||
|
||||
|
||||
|
||||
### kafka 的优势
|
||||
|
||||
Kafka 模型的优势在于每个主题都具有这些属性 - 它可以扩展处理并且也是多用户 - 不需要选择其中一个。
|
||||
|
||||
与传统的消息系统相比,Kafka 具有更强的订购保证。
|
||||
|
||||
传统队列在服务器上按顺序保留记录,如果多个消费者从队列中消耗,则服务器按照存储顺序分发记录。但是,虽然服务器按顺序分发记录,但是记录是异步传递给消费者的,因此它们可能会在不同的消费者处出现故障。这实际上意味着在存在并行消耗的情况下丢失记录的顺序。消息传递系统通常通过具有“独占消费者”概念来解决这个问题,该概念只允许一个进程从队列中消耗,但当然这意味着处理中没有并行性。
|
||||
|
||||
kafka 做得更好。通过在主题中具有并行性概念 - 分区 - ,Kafka 能够在消费者流程池中提供订购保证和负载平衡。这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者使用。通过这样做,我们确保使用者是该分区的唯一读者并按顺序使用数据。由于有许多分区,这仍然可以平衡许多消费者实例的负载。但请注意,消费者组中的消费者实例不能超过分区。
|
||||
|
||||
|
||||
|
||||
## 7 kafka作为存储系统
|
||||
任何允许发布与消费消息分离的消息的消息队列实际上充当了正在进行的消息的存储系统。Kafka的不同之处在于它是一个非常好的存储系统。
|
||||
|
||||
写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也保证写入仍然存在。
|
||||
|
||||
磁盘结构Kafka很好地使用了规模 - 无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行相同的操作。
|
||||
|
||||
由于认真对待存储并允许客户端控制其读取位置,您可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
|
||||
|
||||
有关Kafka的提交日志存储和复制设计的详细信息,请阅读此页面。
|
||||
|
||||
|
||||
## 8 kafka用于流处理
|
||||
仅仅读取,写入和存储数据流是不够的,目的是实现流的实时处理。
|
||||
|
||||
在Kafka中,流处理器是指从输入主题获取连续数据流,对此输入执行某些处理以及生成连续数据流以输出主题的任何内容。
|
||||
|
||||
例如,零售应用程序可能会接收销售和发货的输入流,并输出重新排序流和根据此数据计算的价格调整。
|
||||
|
||||
可以使用生产者和消费者API直接进行简单处理。但是,对于更复杂的转换,Kafka提供了完全集成的Streams API。这允许构建执行非平凡处理的应用程序,这些应用程序可以计算流的聚合或将流连接在一起。
|
||||
|
||||
此工具有助于解决此类应用程序面临的难题:处理无序数据,在代码更改时重新处理输入,执行有状态计算等。
|
||||
|
||||
流API构建在Kafka提供的核心原语上:它使用生产者和消费者API进行输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。
|
||||
|
||||
|
||||
## 10 kafka使用场景
|
||||
### 1 消息Messaging
|
||||
Kafka可以替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生成器分离,缓冲未处理的消息等)。与大多数消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和容错功能,这使其成为大规模消息处理应用程序的理想解决方案。
|
||||
|
||||
根据经验,消息传递的使用通常相对较低,但可能需要较低的端到端延迟,并且通常取决于Kafka提供的强大的耐用性保证。
|
||||
|
||||
在这个领域,Kafka可与传统的消息传递系统(如ActiveMQ或 RabbitMQ)相媲美。
|
||||
|
||||
|
||||
|
||||
### 2 网站活动跟踪
|
||||
Kafka的原始用例是能够将用户活动跟踪管道重建为一组实时发布 - 订阅源。这意味着站点活动(页面查看,搜索或用户可能采取的其他操作)将发布到中心主题,每个活动类型包含一个主题。这些源可用于订购一系列用例,包括实时处理,实时监控以及加载到Hadoop或离线数据仓库系统以进行脱机处理和报告。
|
||||
|
||||
活动跟踪通常非常高,因为为每个用户页面视图生成了许多活动消息。
|
||||
|
||||
|
||||
|
||||
### 3 度量Metrics
|
||||
Kafka通常用于运营监控数据。这涉及从分布式应用程序聚合统计信息以生成操作数据的集中式提要。
|
||||
|
||||
|
||||
|
||||
### 4 日志聚合
|
||||
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据作为消息流更清晰地抽象出来。这允许更低延迟的处理并更容易支持多个数据源和分布式数据消耗。与Scribe或Flume等以日志为中心的系统相比,Kafka提供了同样出色的性能,由于复制而具有更强的耐用性保证,以及更低的端到端延迟。
|
||||
|
||||
|
||||
|
||||
### 5 流处理
|
||||
许多Kafka用户在处理由多个阶段组成的管道时处理数据,其中原始输入数据从Kafka主题中消费,然后聚合,丰富或以其他方式转换为新主题以供进一步消费或后续处理。
|
||||
|
||||
例如,用于推荐新闻文章的处理管道可以从RSS订阅源抓取文章内容并将其发布到“文章”主题; 进一步处理可能会对此内容进行规范化或重复数据删除,并将已清理的文章内容发布到新主题; 最终处理阶段可能会尝试向用户推荐此内容。此类处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,这是一个轻量级但功能强大的流处理库,名为Kafka Streams 在Apache Kafka中可用于执行如上所述的此类数据处理。除了Kafka Streams之外,其他开源流处理工具包括Apache Storm和 Apache Samza。
|
||||
|
||||
|
||||
|
||||
### 6 Event Sourcing
|
||||
Event Sourcing是一种应用程序设计风格,其中状态更改记录为按时间排序的记录序列。Kafka对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。
|
||||
|
||||
|
||||
|
||||
### 7 提交日志
|
||||
Kafka可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的日志压缩功能有助于支持此用法。在这种用法中,Kafka类似于Apache BookKeeper项目。
|
||||
|
||||
|
||||
@@ -1,738 +0,0 @@
|
||||
[原文链接:《超详细“零”基础kafka入门篇》](https://www.cnblogs.com/along21/p/10278100.html)
|
||||
|
||||
> 原文中很多内容读起来不通顺,名词一会儿用中文,一会儿用英文,比如 consumer ,文中部分位置将其翻译为 消费者,部分位置又翻译为 使用者,所以,严重怀疑文章是机翻的。虽然内容很全很细,但读起来很累😢
|
||||
>
|
||||
> CnPeng 2020-12-22 我读不下去了。。。。
|
||||
|
||||
---
|
||||
|
||||
## 1.1 认识 kafka
|
||||
|
||||
**Kafka 是一个分布式流媒体平台**
|
||||
|
||||
kafka官网:[http://kafka.apache.org/](http://kafka.apache.org/)
|
||||
|
||||
### 1.1.1 kafka 简介
|
||||
|
||||
#### 1.1.1.1 流媒体平台有三个关键功能:
|
||||
|
||||
* **发布和订阅记录流**,类似于消息队列或企业消息传递系统。
|
||||
* 以容错的**持久方式存储记录流**。
|
||||
* **记录发生时处理流**。
|
||||
|
||||
|
||||
#### 1.1.1.2 Kafka 通常用于两大类应用:
|
||||
|
||||
* 构建可在系统或应用程序之间可靠获取数据的**实时流数据管道**
|
||||
* 构建转换或响应数据流的**实时流应用程序**
|
||||
|
||||
要了解 Kafka 如何做这些事情,让我们深入探讨 Kafka 的能力。
|
||||
|
||||
|
||||
|
||||
#### 1.1.1.2 首先是几个概念:
|
||||
|
||||
* Kafka 作为一个集群运行在一个或多个可跨多个数据中心的服务器上。
|
||||
* Kafka 集群以称为 `topics主题` 的类别存储记录流。
|
||||
* 每条记录都包含一个 **`键`**,一个 **`值`** 和一个 **`时间戳`**。
|
||||
|
||||
|
||||
#### 1.1.1.3 Kafka 有四个核心 API:
|
||||
|
||||
* **`Producer API(生产者API)`**: 允许应用程序发布记录流至一个或多个 kafka 的 `topics(主题)`。
|
||||
* **`Consumer API(消费者API)`**: 允许应用程序订阅一个或多个 `topics(主题)`,并处理所产生的对他们记录的数据流。
|
||||
* **`Streams API(流API)`**: 允许应用程序充当流处理器,从一个或多个 `topics(主题)`消耗的输入流,并产生一个输出流至一个或多个输出的 `topics(主题)`,有效地变换所述输入流,以输出流。
|
||||
* **`Connector API(连接器API)`**: 允许构建和运行 kafka `topics(主题)` 连接到现有的应用程序或数据系统中重用生产者或消费者。例如,关系数据库的连接器可能捕获对表的每个更改。
|
||||
|
||||

|
||||
|
||||
在 Kafka 中,客户端和服务器之间的通信是通过简单,高性能,语言无关的 **TCP 协议** 完成的。此协议已版本化并保持与旧版本的向后兼容性。Kafka 提供 Java 客户端,但客户端有多种语言版本。
|
||||
|
||||
## 1.2 Topics 主题 和 partitions 分区
|
||||
|
||||
我们首先深入了解 Kafka 为记录流提供的核心抽象 - `主题topics`
|
||||
|
||||
* 一个 `Topic` 可以认为是一类消息,每个 `topic` 将被分成多个 `partition(区)`,每个 `partition` 在存储层面是 `append log` 文件
|
||||
|
||||
* **topic 主题是发布记录的类别或订阅源名称**。Kafka 的主题总是多用户; 也就是说,一个主题可以有零个,一个或多个消费者订阅写入它的数据。
|
||||
|
||||
对于每个主题,Kafka 群集都维护一个如下所示的 `分区` 日志:
|
||||
|
||||

|
||||
|
||||
|
||||
**每个分区都是一个有序的**,不可变的记录序列,不断附加到结构化的提交日志中。分区中的记录每个都分配了一个称为**偏移**的**顺序ID号**,**它唯一地标识分区中的每个记录**。
|
||||
|
||||
**Kafka 集群持久保存所有已发布的记录,直到保留期结束才会丢弃记录**。例如,如果保留策略设置为两天,则在发布记录后的两天内,它可供使用,之后将被丢弃以释放空间。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据不是问题。
|
||||
|
||||

|
||||
|
||||
实际上,**基于每个消费者保留的唯一元数据是该消费者在日志中的偏移或位置**。这种偏移由消费者控制:通常消费者在读取记录时会线性地提高其偏移量,但事实上,由于该位置由消费者控制,因此它可以按照自己喜欢的任何顺序消费记录。例如,消费者可以重置为较旧的偏移量来重新处理过去的数据,或者跳到最近的记录并从“现在”开始消费。
|
||||
|
||||
这些功能组合意味着 Kafka `消费者consumers` 非常 cheap - 他们可以来来往往对集群或其他消费者没有太大影响。例如,您可以使用我们的命令行工具 “tail” 任何主题的内容,而无需更改任何现有使用者所消耗的内容。
|
||||
|
||||
日志中的分区有多种用途。首先,它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。其次,它们充当了并行性的单位 - 更多的是它。
|
||||
|
||||
## 1.3 Distribution 分配
|
||||
|
||||
一个 `Topic` 的多个 `partitions`, 被分布在 kafka 集群中的多个 `server` 上; 每个`server( kafka 实例)` 负责 `partitions` 中消息的读写操作; 此外 kafka 还可以配置 `partitions` 需要备份的个数 (`replicas`),每个 `partition` 将会被备份到多台机器上,以提高可用性.
|
||||
|
||||
基于 `replicated` 方案,那么就意味着需要对多个备份进行调度;
|
||||
|
||||
每个 `partition` 都有一个 `server` 为 `leader`; `leader` 负责所有的读写操作,如果`leader` 失效,那么将会有其他 `follower` 来接管(成为新的 `leader` );
|
||||
|
||||
`follower`只是单调的和 `leader` 跟进,同步消息即可.
|
||||
|
||||
由此可见作为 `leader` 的 `server` 承载了全部的请求压力,因此从集群的整体考虑,有多少个 `partitions` 就意味着有多少个 `leader` ,kafka 会将 `leader` 均衡的分散在每个实例上,来确保整体的性能稳定。
|
||||
|
||||
|
||||
## 1.4 Producers 生产者和 Consumers 消费者
|
||||
|
||||
### 1.4.1 Producers 生产者
|
||||
|
||||
`Producers` 将数据发布到指定的 `topics 主题`。同时 `Producer` 也能决定将此消息归属于哪个`partition`; 比如基于 `round-robin` 方式或者通过其他的一些算法等。
|
||||
|
||||
|
||||
### 1.4.2 Consumers
|
||||
|
||||
* 本质上 kafka 只支持 `Topic` .每个 `consumer` 属于一个 `consumer group` ;反过来说,每个`group` 中可以有多个 `consumer` .发送到 `Topic` 的消息,只会被订阅此 `Topic` 的每个 `group` 中的一个 `consumer`消费。
|
||||
|
||||
* 如果所有使用者实例具有**相同的使用者组**,则记录将有效地在使用者实例上进行**负载平衡**。
|
||||
|
||||
* 如果所有消费者实例具有**不同的消费者组**,则每个记录将**广播**到所有消费者进程。
|
||||
|
||||

|
||||
|
||||
|
||||
上图中,有两个服务器 Kafka 集群,托管四个分区(P0-P3),包含两个使用者组。消费者组 A 有两个消费者实例,B 组有四个消费者实例。
|
||||
|
||||
|
||||
在 Kafka 中实现消费 `consumption` 的方式是通过**在消费者实例上划分日志中的分区,以便每个实例在任何时间点都是分配的“公平份额”的独占消费者**。维护组中成员资格的过程由 Kafka 协议动态处理。如果新实例加入该组,他们将从该组的其他成员接管一些分区; 如果实例死亡,其分区将分发给其余实例。
|
||||
|
||||
**Kafka 仅提供分区内记录的总订单,而不是主题中不同分区之间的记录**。对于大多数应用程序而言,按分区排序与按键分区数据的能力相结合就足够了。但是,如果您需要对记录进行总订单,则可以使用仅包含一个分区的主题来实现,但这将意味着每个使用者组只有一个使用者进程。
|
||||
|
||||
## 1.5 Consumers kafka 确保
|
||||
|
||||
* 发送到 `partitions` 中的消息将会按照它接收的顺序追加到日志中。也就是说,如果记录 M1 由与记录 M2 相同的生成者发送,并且首先发送 M1 ,则 M1 将具有比 M2 更低的偏移并且在日志中更早出现。
|
||||
|
||||
* 消费者实例按照它们存储在日志中的顺序查看记录。对于消费者而言,它们消费消息的顺序和日志中消息顺序一致。
|
||||
|
||||
* 如果 `Topic` 的 `replicationfactor` 为 N ,那么允许 N-1 个 kafka 实例失效,我们将容忍最多 N-1 个服务器故障,而不会丢失任何提交到日志的记录。
|
||||
|
||||
|
||||
## 1.6 kafka 作为消息系统
|
||||
|
||||
Kafka 的流概念与传统的企业邮件系统相比如何?
|
||||
|
||||
### 1.6.1 与传统消息系统对比
|
||||
|
||||
消息传统上有两种模型:`queuing`(队列) 和`publish-subscribe`(发布 - 订阅)。
|
||||
|
||||
* 在 `队列` 中,消费者池可以从服务器读取并且每个记录转到其中一个;
|
||||
* `在发布 - 订阅`中,记录被广播给所有消费者。
|
||||
|
||||
这两种模型中的每一种都有优点和缺点。`队列` 的优势在于它允许您在多个消费者实例上划分数据处理,从而可以扩展您的处理。不幸的是,一旦一个进程读取它已经消失的数据,队列就不是多用户。`发布 - 订阅` 允许您将数据广播到多个进程,但由于每条消息都发送给每个订阅者,因此无法进行扩展处理。
|
||||
|
||||
kafka 的消费者群体概念概括了这两个概念。与队列一样,使用者组允许您将处理划分为一组进程(使用者组的成员)。与 `发布 - 订阅` 一样,Kafka 允许您向多个消费者组广播消息。
|
||||
|
||||
|
||||
|
||||
### 1.6.2 kafka 的优势
|
||||
|
||||
Kafka 模型的优势在于每个主题都具有这些属性 - 它可以扩展处理并且也是多用户 - 不需要选择其中一个。
|
||||
|
||||
与传统的消息系统相比,Kafka 具有更强的订购保证。
|
||||
|
||||
传统队列在服务器上按顺序保留记录,如果多个消费者从队列中消耗,则服务器按照存储顺序分发记录。但是,虽然服务器按顺序分发记录,但是记录是异步传递给消费者的,因此它们可能会在不同的消费者处出现故障。这实际上意味着在存在并行消耗的情况下丢失记录的顺序。消息传递系统通常通过具有“独占消费者”概念来解决这个问题,该概念只允许一个进程从队列中消耗,但当然这意味着处理中没有并行性。
|
||||
|
||||
kafka 做得更好。通过在主题中具有并行性概念 - 分区 - ,Kafka 能够在消费者流程池中提供订购保证和负载平衡。这是通过将主题中的分区分配给使用者组中的使用者来实现的,以便每个分区仅由该组中的一个使用者使用。通过这样做,我们确保使用者是该分区的唯一读者并按顺序使用数据。由于有许多分区,这仍然可以平衡许多消费者实例的负载。但请注意,消费者组中的消费者实例不能超过分区。
|
||||
|
||||
|
||||
TODO
|
||||
|
||||
1.7 kafka作为存储系统
|
||||
任何允许发布与消费消息分离的消息的消息队列实际上充当了正在进行的消息的存储系统。Kafka的不同之处在于它是一个非常好的存储系统。
|
||||
写入Kafka的数据将写入磁盘并进行复制以实现容错。Kafka允许生产者等待确认,以便在完全复制之前写入不被认为是完整的,并且即使写入的服务器失败也保证写入仍然存在。
|
||||
磁盘结构Kafka很好地使用了规模 - 无论服务器上有50 KB还是50 TB的持久数据,Kafka都会执行相同的操作。
|
||||
由于认真对待存储并允许客户端控制其读取位置,您可以将Kafka视为一种专用于高性能,低延迟提交日志存储,复制和传播的专用分布式文件系统。
|
||||
有关Kafka的提交日志存储和复制设计的详细信息,请阅读此页面。
|
||||
|
||||
|
||||
1.8 kafka用于流处理
|
||||
仅仅读取,写入和存储数据流是不够的,目的是实现流的实时处理。
|
||||
在Kafka中,流处理器是指从输入主题获取连续数据流,对此输入执行某些处理以及生成连续数据流以输出主题的任何内容。
|
||||
例如,零售应用程序可能会接收销售和发货的输入流,并输出重新排序流和根据此数据计算的价格调整。
|
||||
可以使用生产者和消费者API直接进行简单处理。但是,对于更复杂的转换,Kafka提供了完全集成的Streams API。这允许构建执行非平凡处理的应用程序,这些应用程序可以计算流的聚合或将流连接在一起。
|
||||
此工具有助于解决此类应用程序面临的难题:处理无序数据,在代码更改时重新处理输入,执行有状态计算等。
|
||||
流API构建在Kafka提供的核心原语上:它使用生产者和消费者API进行输入,使用Kafka进行有状态存储,并在流处理器实例之间使用相同的组机制来实现容错。
|
||||
|
||||
|
||||
2、kafka使用场景
|
||||
2.1 消息Messaging
|
||||
Kafka可以替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生成器分离,缓冲未处理的消息等)。与大多数消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和容错功能,这使其成为大规模消息处理应用程序的理想解决方案。
|
||||
|
||||
根据经验,消息传递的使用通常相对较低,但可能需要较低的端到端延迟,并且通常取决于Kafka提供的强大的耐用性保证。
|
||||
|
||||
在这个领域,Kafka可与传统的消息传递系统(如ActiveMQ或 RabbitMQ)相媲美。
|
||||
|
||||
|
||||
|
||||
2.2 网站活动跟踪
|
||||
Kafka的原始用例是能够将用户活动跟踪管道重建为一组实时发布 - 订阅源。这意味着站点活动(页面查看,搜索或用户可能采取的其他操作)将发布到中心主题,每个活动类型包含一个主题。这些源可用于订购一系列用例,包括实时处理,实时监控以及加载到Hadoop或离线数据仓库系统以进行脱机处理和报告。
|
||||
|
||||
活动跟踪通常非常高,因为为每个用户页面视图生成了许多活动消息。
|
||||
|
||||
|
||||
|
||||
2.3 度量Metrics
|
||||
Kafka通常用于运营监控数据。这涉及从分布式应用程序聚合统计信息以生成操作数据的集中式提要。
|
||||
|
||||
|
||||
|
||||
2.4 日志聚合
|
||||
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据作为消息流更清晰地抽象出来。这允许更低延迟的处理并更容易支持多个数据源和分布式数据消耗。与Scribe或Flume等以日志为中心的系统相比,Kafka提供了同样出色的性能,由于复制而具有更强的耐用性保证,以及更低的端到端延迟。
|
||||
|
||||
|
||||
|
||||
2.5 流处理
|
||||
许多Kafka用户在处理由多个阶段组成的管道时处理数据,其中原始输入数据从Kafka主题中消费,然后聚合,丰富或以其他方式转换为新主题以供进一步消费或后续处理。
|
||||
|
||||
例如,用于推荐新闻文章的处理管道可以从RSS订阅源抓取文章内容并将其发布到“文章”主题; 进一步处理可能会对此内容进行规范化或重复数据删除,并将已清理的文章内容发布到新主题; 最终处理阶段可能会尝试向用户推荐此内容。此类处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,这是一个轻量级但功能强大的流处理库,名为Kafka Streams 在Apache Kafka中可用于执行如上所述的此类数据处理。除了Kafka Streams之外,其他开源流处理工具包括Apache Storm和 Apache Samza。
|
||||
|
||||
|
||||
|
||||
2.6 Event Sourcing
|
||||
Event Sourcing是一种应用程序设计风格,其中状态更改记录为按时间排序的记录序列。Kafka对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。
|
||||
|
||||
|
||||
|
||||
2.7 提交日志
|
||||
Kafka可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的日志压缩功能有助于支持此用法。在这种用法中,Kafka类似于Apache BookKeeper项目。
|
||||
|
||||
|
||||
|
||||
3、kafka安装
|
||||
3.1 下载安装
|
||||
到官网http://kafka.apache.org/downloads.html下载想要的版本;我这里下载的最新稳定版2.1.0
|
||||
|
||||
注:由于Kafka控制台脚本对于基于Unix和Windows的平台是不同的,因此在Windows平台上使用bin\windows\ 而不是bin/ 将脚本扩展名更改为.bat。
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# wget http://mirrors.shu.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz
|
||||
[root@along ~]# tar -C /data/ -xvf kafka_2.11-2.1.0.tgz
|
||||
[root@along ~]# cd /data/kafka_2.11-2.1.0/
|
||||
|
||||
|
||||
3.2 配置启动zookeeper
|
||||
kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
|
||||
|
||||
(1)zookeeper需要java环境
|
||||
|
||||
1
|
||||
[root@along ~]# yum -y install java-1.8.0
|
||||
|
||||
|
||||
(2)这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。
|
||||
|
||||
如果需要下载指定zookeeper版本;可以单独去zookeeper官网http://mirrors.shu.edu.cn/apache/zookeeper/下载指定版本。
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
[root@along ~]# cd /data/kafka_2.11-2.1.0/
|
||||
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/zookeeper.properties
|
||||
dataDir=/tmp/zookeeper #数据存储目录
|
||||
clientPort=2181 #zookeeper端口
|
||||
maxClientCnxns=0
|
||||
注:可自行添加修改zookeeper配置
|
||||
|
||||
|
||||
|
||||
3.3 配置kafka
|
||||
(1)修改配置文件
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
11
|
||||
12
|
||||
13
|
||||
14
|
||||
15
|
||||
16
|
||||
17
|
||||
18
|
||||
19
|
||||
20
|
||||
[root@along kafka_2.11-2.1.0]# grep "^[^#]" config/server.properties
|
||||
broker.id=0
|
||||
listeners=PLAINTEXT://localhost:9092
|
||||
num.network.threads=3
|
||||
num.io.threads=8
|
||||
socket.send.buffer.bytes=102400
|
||||
socket.receive.buffer.bytes=102400
|
||||
socket.request.max.bytes=104857600
|
||||
log.dirs=/tmp/kafka-logs
|
||||
num.partitions=1
|
||||
num.recovery.threads.per.data.dir=1
|
||||
offsets.topic.replication.factor=1
|
||||
transaction.state.log.replication.factor=1
|
||||
transaction.state.log.min.isr=1
|
||||
log.retention.hours=168
|
||||
log.segment.bytes=1073741824
|
||||
log.retention.check.interval.ms=300000
|
||||
zookeeper.connect=localhost:2181
|
||||
zookeeper.connection.timeout.ms=6000
|
||||
group.initial.rebalance.delay.ms=0
|
||||
注:可根据自己需求修改配置文件
|
||||
|
||||
broker.id:唯一标识ID
|
||||
listeners=PLAINTEXT://localhost:9092:kafka服务监听地址和端口
|
||||
log.dirs:日志存储目录
|
||||
zookeeper.connect:指定zookeeper服务
|
||||
|
||||
|
||||
(2)配置环境变量
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
[root@along ~]# vim /etc/profile.d/kafka.sh
|
||||
export KAFKA_HOME="/data/kafka_2.11-2.1.0"
|
||||
export PATH="${KAFKA_HOME}/bin:$PATH"
|
||||
[root@along ~]# source /etc/profile.d/kafka.sh
|
||||
|
||||
|
||||
(3)配置服务启动脚本
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
11
|
||||
12
|
||||
13
|
||||
14
|
||||
15
|
||||
16
|
||||
17
|
||||
18
|
||||
19
|
||||
20
|
||||
21
|
||||
22
|
||||
23
|
||||
24
|
||||
25
|
||||
26
|
||||
27
|
||||
28
|
||||
29
|
||||
30
|
||||
31
|
||||
32
|
||||
33
|
||||
34
|
||||
35
|
||||
36
|
||||
37
|
||||
38
|
||||
39
|
||||
40
|
||||
41
|
||||
42
|
||||
43
|
||||
44
|
||||
45
|
||||
46
|
||||
47
|
||||
48
|
||||
49
|
||||
50
|
||||
51
|
||||
52
|
||||
53
|
||||
54
|
||||
55
|
||||
56
|
||||
57
|
||||
58
|
||||
59
|
||||
60
|
||||
61
|
||||
62
|
||||
63
|
||||
64
|
||||
[root@along ~]# vim /etc/init.d/kafka
|
||||
#!/bin/sh
|
||||
#
|
||||
# chkconfig: 345 99 01
|
||||
# description: Kafka
|
||||
#
|
||||
# File : Kafka
|
||||
#
|
||||
# Description: Starts and stops the Kafka server
|
||||
#
|
||||
|
||||
source /etc/rc.d/init.d/functions
|
||||
|
||||
KAFKA_HOME=/data/kafka_2.11-2.1.0
|
||||
KAFKA_USER=root
|
||||
export LOG_DIR=/tmp/kafka-logs
|
||||
|
||||
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
|
||||
|
||||
# See how we were called.
|
||||
case "$1" in
|
||||
|
||||
start)
|
||||
echo -n "Starting Kafka:"
|
||||
/sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
|
||||
echo " done."
|
||||
exit 0
|
||||
;;
|
||||
|
||||
stop)
|
||||
echo -n "Stopping Kafka: "
|
||||
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
|
||||
echo " done."
|
||||
exit 0
|
||||
;;
|
||||
hardstop)
|
||||
echo -n "Stopping (hard) Kafka: "
|
||||
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
|
||||
echo " done."
|
||||
exit 0
|
||||
;;
|
||||
|
||||
status)
|
||||
c_pid=`ps -ef | grep kafka.Kafka | grep -v grep | awk '{print $2}'`
|
||||
if [ "$c_pid" = "" ] ; then
|
||||
echo "Stopped"
|
||||
exit 3
|
||||
else
|
||||
echo "Running $c_pid"
|
||||
exit 0
|
||||
fi
|
||||
;;
|
||||
|
||||
restart)
|
||||
stop
|
||||
start
|
||||
;;
|
||||
|
||||
*)
|
||||
echo "Usage: kafka {start|stop|hardstop|status|restart}"
|
||||
exit 1
|
||||
;;
|
||||
|
||||
esac
|
||||
|
||||
|
||||
3.4 启动kafka服务
|
||||
(1)后台启动zookeeper服务
|
||||
|
||||
1
|
||||
[root@along ~]# nohup zookeeper-server-start.sh /data/kafka_2.11-2.1.0/config/zookeeper.properties &
|
||||
|
||||
|
||||
(2)启动kafka服务
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
[root@along ~]# service kafka start
|
||||
Starting kafka (via systemctl): [ OK ]
|
||||
[root@along ~]# service kafka status
|
||||
Running 86018
|
||||
[root@along ~]# ss -nutl
|
||||
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
|
||||
tcp LISTEN 0 50 :::9092 :::*
|
||||
tcp LISTEN 0 50 :::2181 :::*
|
||||
|
||||
|
||||
4、kafka使用简单入门
|
||||
4.1 创建主题topics
|
||||
创建一个名为“along”的主题,它只包含一个分区,只有一个副本:
|
||||
|
||||
1
|
||||
2
|
||||
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic along
|
||||
Created topic "along".
|
||||
如果我们运行list topic命令,我们现在可以看到该主题:
|
||||
|
||||
1
|
||||
2
|
||||
[root@along ~]# kafka-topics.sh --list --zookeeper localhost:2181
|
||||
along
|
||||
|
||||
|
||||
4.2 发送一些消息
|
||||
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
|
||||
|
||||
运行生产者,然后在控制台中键入一些消息以发送到服务器。
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic along
|
||||
>This is a message
|
||||
>This is another message
|
||||
|
||||
|
||||
4.3 启动消费者
|
||||
Kafka还有一个命令行使用者,它会将消息转储到标准输出。
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic along --from-beginning
|
||||
This is a message
|
||||
This is another message
|
||||
所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。
|
||||
|
||||
|
||||
|
||||
5、设置多代理kafka群集
|
||||
到目前为止,我们一直在与一个broker运行,但这并不好玩。对于Kafka,单个代理只是一个大小为1的集群,因此除了启动一些代理实例之外没有太多变化。但是为了感受它,让我们将我们的集群扩展到三个节点(仍然在我们的本地机器上)。
|
||||
|
||||
5.1 准备配置文件
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
10
|
||||
11
|
||||
[root@along kafka_2.11-2.1.0]# cd /data/kafka_2.11-2.1.0/
|
||||
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-1.properties
|
||||
[root@along kafka_2.11-2.1.0]# cp config/server.properties config/server-2.properties
|
||||
[root@along kafka_2.11-2.1.0]# vim config/server-1.properties
|
||||
broker.id=1
|
||||
listeners=PLAINTEXT://:9093
|
||||
log.dirs=/tmp/kafka-logs-1
|
||||
[root@along kafka_2.11-2.1.0]# vim config/server-2.properties
|
||||
broker.id=2
|
||||
listeners=PLAINTEXT://:9094
|
||||
log.dirs=/tmp/kafka-logs-2
|
||||
注:该broker.id 属性是群集中每个节点的唯一且永久的名称。我们必须覆盖端口和日志目录,因为我们在同一台机器上运行这些,并且我们希望让所有代理尝试在同一端口上注册或覆盖彼此的数据。
|
||||
|
||||
|
||||
|
||||
5.2 开启集群另2个kafka服务
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-1.properties &
|
||||
[root@along ~]# nohup kafka-server-start.sh /data/kafka_2.11-2.1.0/config/server-2.properties &
|
||||
[root@along ~]# ss -nutl
|
||||
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
|
||||
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
|
||||
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
|
||||
tcp LISTEN 0 50 ::ffff:127.0.0.1:9094 :::*
|
||||
|
||||
|
||||
5.3 在集群中进行操作
|
||||
(1)现在创建一个复制因子为3的新主题my-replicated-topic
|
||||
|
||||
1
|
||||
2
|
||||
[root@along ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
|
||||
Created topic "my-replicated-topic".
|
||||
|
||||
|
||||
(2)在一个集群中,运行“describe topics”命令查看哪个broker正在做什么
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
|
||||
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
|
||||
Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
|
||||
注释:第一行给出了所有分区的摘要,每个附加行提供有关一个分区的信息。由于我们只有一个分区用于此主题,因此只有一行。
|
||||
|
||||
“leader”是负责给定分区的所有读取和写入的节点。每个节点将成为随机选择的分区部分的领导者。
|
||||
“replicas”是复制此分区日志的节点列表,无论它们是否为领导者,或者即使它们当前处于活动状态。
|
||||
“isr”是“同步”复制品的集合。这是副本列表的子集,该列表当前处于活跃状态并且已经被领导者捕获。
|
||||
请注意,Leader: 2,在我的示例中,节点2 是该主题的唯一分区的Leader。
|
||||
|
||||
|
||||
|
||||
(3)可以在我们创建的原始主题上运行相同的命令,以查看它的位置
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic along
|
||||
Topic:along PartitionCount:1 ReplicationFactor:1 Configs:
|
||||
Topic: along Partition: 0 Leader: 0 Replicas: 0 Isr: 0
|
||||
|
||||
|
||||
(4)向我们的新主题发布一些消息:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
[root@along ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
|
||||
>my test message 1
|
||||
>my test message 2
|
||||
>^C
|
||||
|
||||
|
||||
(5)现在让我们使用这些消息:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
|
||||
my test message 1
|
||||
my test message 2
|
||||
|
||||
|
||||
5.4 测试集群的容错性
|
||||
(1)现在让我们测试一下容错性。Broker 2 充当leader 所以让我们杀了它:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
[root@along ~]# ps aux | grep server-2.properties |awk '{print $2}'
|
||||
106737
|
||||
[root@along ~]# kill -9 106737
|
||||
[root@along ~]# ss -nutl
|
||||
tcp LISTEN 0 50 ::ffff:127.0.0.1:9092 :::*
|
||||
tcp LISTEN 0 50 ::ffff:127.0.0.1:9093 :::*
|
||||
|
||||
|
||||
(2)leader 已切换到其中一个从属节点,节点2不再位于同步副本集中:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
|
||||
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
|
||||
Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 2,0,1 Isr: 0,1
|
||||
|
||||
|
||||
(3)即使最初接受写入的leader 已经失败,这些消息仍可供消费:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
|
||||
my test message 1
|
||||
my test message 2
|
||||
|
||||
|
||||
6、使用Kafka Connect导入/导出数据
|
||||
从控制台写入数据并将其写回控制台是一个方便的起点,但有时候可能希望使用其他来源的数据或将数据从Kafka导出到其他系统。对于许多系统,您可以使用Kafka Connect导入或导出数据,而不是编写自定义集成代码。
|
||||
|
||||
Kafka Connect是Kafka附带的工具,用于向Kafka导入和导出数据。它是一个可扩展的工具,运行连接器,实现与外部系统交互的自定义逻辑。在本快速入门中,我们将了解如何使用简单的连接器运行Kafka Connect,这些连接器将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。
|
||||
|
||||
(1)首先创建一些种子数据进行测试:
|
||||
|
||||
1
|
||||
[root@along ~]# echo -e "foo\nbar" > test.txt
|
||||
或者在Windows上:
|
||||
|
||||
1
|
||||
2
|
||||
> echo foo> test.txt
|
||||
> echo bar>> test.txt
|
||||
|
||||
|
||||
(2)接下来,启动两个以独立模式运行的连接器,这意味着它们在单个本地专用进程中运行。提供三个配置文件作为参数。
|
||||
|
||||
第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。
|
||||
其余配置文件均指定要创建的连接器。这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
[root@along ~]# connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
|
||||
[2019-01-16 16:16:31,884] INFO Kafka Connect standalone worker initializing ... (org.apache.kafka.connect.cli.ConnectStandalone:67)
|
||||
[2019-01-16 16:16:31,903] INFO WorkerInfo values:
|
||||
... ...
|
||||
注:Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是源连接器,它从输入文件读取行并生成每个Kafka主题,第二个是宿连接器从Kafka主题读取消息并将每个消息生成为输出文件中的一行。
|
||||
|
||||
|
||||
|
||||
(3)验证是否导入成功(另起终端)
|
||||
|
||||
在启动过程中,您将看到许多日志消息,包括一些指示正在实例化连接器的日志消息。
|
||||
|
||||
① 一旦Kafka Connect进程启动,源连接器应该开始从test.txt主题读取行并将其生成到主题connect-test,并且接收器连接器应该开始从主题读取消息connect-test 并将它们写入文件test.sink.txt。我们可以通过检查输出文件的内容来验证数据是否已通过整个管道传递:
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# cat test.sink.txt
|
||||
foo
|
||||
bar
|
||||
|
||||
|
||||
② 请注意,数据存储在Kafka主题中connect-test,因此我们还可以运行控制台使用者来查看主题中的数据(或使用自定义使用者代码来处理它):
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
|
||||
{"schema":{"type":"string","optional":false},"payload":"foo"}
|
||||
{"schema":{"type":"string","optional":false},"payload":"bar"}
|
||||
|
||||
|
||||
(4)继续追加数据,验证
|
||||
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
[root@along ~]# echo Another line>> test.txt
|
||||
[root@along ~]# cat test.sink.txt
|
||||
foo
|
||||
bar
|
||||
Another line
|
||||
[root@along ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
|
||||
{"schema":{"type":"string","optional":false},"payload":"foo"}
|
||||
{"schema":{"type":"string","optional":false},"payload":"bar"}
|
||||
{"schema":{"type":"string","optional":false},"payload":"Another line"}
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
* [原文链接 ](https://www.liwenzhou.com/posts/Go/go_kafka/)
|
||||
* [视频:150-152](https://www.bilibili.com/video/BV17Q4y1P7n9?p=150)
|
||||
> 参考文献
|
||||
> * [原文链接 ](https://www.liwenzhou.com/posts/Go/go_kafka/)
|
||||
|
||||
|
||||
|
||||
**Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据,具有高性能、持久化、多副本备份、横向扩展等特点**。本文介绍了如何使用 Go 语言发送和接收 kafka 消息。
|
||||
|
||||
## 30.1 sarama
|
||||
## 1 sarama
|
||||
|
||||
Go 语言中连接 kafka 使用第三方库: [github.com/Shopify/sarama](https://github.com/Shopify/sarama)。
|
||||
|
||||
@@ -25,7 +26,7 @@ exec: "gcc":executable file not found in %PATH%
|
||||
|
||||
所以在 Windows 平台请使用 v1.19 版本的 sarama 。
|
||||
|
||||
## 30.2 连接 kafka 发送消息
|
||||
## 2 连接 kafka 发送消息
|
||||
|
||||
```go
|
||||
package main
|
||||
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-08-11.png
Normal file
|
After Width: | Height: | Size: 71 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-11-17.png
Normal file
|
After Width: | Height: | Size: 78 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-12-29.png
Normal file
|
After Width: | Height: | Size: 102 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-16-35.png
Normal file
|
After Width: | Height: | Size: 147 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-18-27.png
Normal file
|
After Width: | Height: | Size: 115 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-24-04.png
Normal file
|
After Width: | Height: | Size: 70 KiB |
BIN
Go/5 GoWeb开发/kafka/image/2021-09-03-08-24-20.png
Normal file
|
After Width: | Height: | Size: 87 KiB |
@@ -1,24 +1,24 @@
|
||||
[原文链接](https://www.liwenzhou.com/posts/Go/go_redis/)、
|
||||
[视频地址](https://www.bilibili.com/video/BV17Q4y1P7n9?p=140)
|
||||
|
||||
在项目开发中 redis 的使用也比较频繁,本文介绍了 Go 语言中 `go-redis` 库的基本使用。
|
||||
> 参考文献
|
||||
> * [原文链接](https://www.liwenzhou.com/posts/Go/go_redis/)
|
||||
|
||||
## 25.1 Redis 介绍
|
||||
|
||||
## 1 Redis 介绍
|
||||
|
||||
**`Redis` 是一个开源的内存数据库**,Redis 提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,**通过复制、持久化和客户端分片等特性,我们可以很方便地将 Redis 扩展成一个能够包含数百 GB 数据、每秒处理上百万次请求的系统**。
|
||||
|
||||
### 25.1.1 Redis 支持的数据结构
|
||||
### 1.1 Redis 支持的数据结构
|
||||
|
||||
Redis 支持诸如 字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、带范围查询的排序集合(sorted sets)、位图(bitmaps)、hyperloglogs、带半径查询和 流的地理空间索引等数据结构(geospatial indexes)。
|
||||
|
||||
### 25.1.2 Redis 应用场景
|
||||
### 1.2 Redis 应用场景
|
||||
|
||||
* 缓存系统,减轻主数据库(MySQL)的压力。
|
||||
* 计数场景,比如微博、抖音中的关注数和粉丝数。
|
||||
* 热门排行榜,需要排序的场景特别适合使用 `ZSET`。
|
||||
* 利用 `LIST` 可以实现队列的功能。
|
||||
|
||||
### 25.1.3 准备 Redis 环境
|
||||
### 1.3 准备 Redis 环境
|
||||
|
||||
这里直接使用 `Docker` 启动一个 redis 环境,方便学习使用。
|
||||
|
||||
@@ -36,9 +36,9 @@ docker run --name redis507 -p 6379:6379 -d redis:5.0.7
|
||||
docker run -it --network host --rm redis:5.0.7 redis-cli
|
||||
```
|
||||
|
||||
## 25.2 `go-redis` 库
|
||||
## 2 `go-redis` 库
|
||||
|
||||
### 25.2.1 安装
|
||||
### 2.1 安装
|
||||
|
||||
区别于另一个比较常用的 Go 语言 redis client 库:[`redigo`](https://github.com/gomodule/redigo) ,我们这里采用 [https://github.com/go-redis/redis](https://github.com/go-redis/redis) 连接 Redis 数据库并进行操作,因为**`go-redis` 支持连接哨兵及集群模式的Redis**。
|
||||
|
||||
@@ -48,9 +48,9 @@ docker run -it --network host --rm redis:5.0.7 redis-cli
|
||||
go get -u github.com/go-redis/redis
|
||||
```
|
||||
|
||||
### 25.2.2 连接
|
||||
### 2.2 连接
|
||||
|
||||
#### 25.2.2.1 普通连接
|
||||
#### 2.2.1 普通连接
|
||||
|
||||
```go
|
||||
// 声明一个全局的rdb变量
|
||||
@@ -78,7 +78,7 @@ func initClient() (err error) {
|
||||
rdb.Ping(context.TODO())
|
||||
```
|
||||
|
||||
#### 25.2.2.2 连接 Redis 哨兵模式
|
||||
#### 2.2.2 连接 Redis 哨兵模式
|
||||
|
||||
```go
|
||||
func initClient()(err error){
|
||||
@@ -94,7 +94,7 @@ func initClient()(err error){
|
||||
}
|
||||
```
|
||||
|
||||
#### 25.2.2.3 连接 Redis 集群
|
||||
#### 2.2.3 连接 Redis 集群
|
||||
|
||||
```go
|
||||
func initClient()(err error){
|
||||
@@ -109,9 +109,9 @@ func initClient()(err error){
|
||||
}
|
||||
```
|
||||
|
||||
### 25.2.3 基本使用
|
||||
### 2.3 基本使用
|
||||
|
||||
#### 25.2.3.1 set/get 示例
|
||||
#### 2.3.1 set/get 示例
|
||||
|
||||
```go
|
||||
func redisExample() {
|
||||
@@ -140,7 +140,7 @@ func redisExample() {
|
||||
}
|
||||
```
|
||||
|
||||
#### 25.2.3.2 zset示例
|
||||
#### 2.3.2 zset示例
|
||||
|
||||
```go
|
||||
func redisExample2() {
|
||||
@@ -209,7 +209,7 @@ C/C++ 99
|
||||
Golang 100
|
||||
```
|
||||
|
||||
#### 25.2.3.3 Pipeline
|
||||
#### 2.3.3 Pipeline
|
||||
|
||||
`Pipeline` 主要**是一种网络优化**。它本质上意味着**客户端缓冲一堆命令并一次性将它们发送到服务器**。这些命令**不能保证在事务中执行**。这样做的好处是节省了每个命令的 **网络往返时间(`RTT`**)。
|
||||
|
||||
@@ -246,7 +246,7 @@ fmt.Println(incr.Val(), err)
|
||||
|
||||
在某些场景下,当我们有多条命令要执行时,就可以考虑使用 pipeline 来优化。
|
||||
|
||||
#### 25.2.3.4 事务
|
||||
#### 2.3.4 事务
|
||||
|
||||
**Redis 是单线程的,因此单个命令始终是原子的**,但是来自不同客户端的两个给定命令可以依次执行,例如在它们之间交替执行。但是,`Multi/exec` 能够确保在 `multi/exec` 两个语句之间的命令之间没有其他客户端正在执行命令。
|
||||
|
||||
@@ -284,7 +284,7 @@ fmt.Println(incr.Val(), err)
|
||||
```
|
||||
|
||||
|
||||
#### 25.2.3.5 Watch
|
||||
#### 2.3.5 Watch
|
||||
|
||||
在某些场景下,我们除了要使用 `MULTI/EXEC` 命令外,还需要配合使用 `WATCH` 命令。在用户使用 `WATCH` 命令监视某个键之后,直到该用户执行 `EXEC` 命令的这段时间里,如果有其他用户抢先对被监视的键进行了替换、更新、删除等操作,那么当用户尝试执行 EXEC 的时候,事务将失败并返回一个错误,用户可以根据这个错误选择重试事务或者放弃事务。
|
||||
|
||||
|
||||
@@ -24,14 +24,17 @@
|
||||
### 知识复习——基础
|
||||
* [ ] 操作系统
|
||||
* [ ] 基础知识
|
||||
* [ ] 关于同步异步机制的理解。总结各种语言、库、中间件、设计模式、数据库、操作系统的同步异步方式。
|
||||
* [ ] Linux底层原理和常见函数
|
||||
* [x] 数据库
|
||||
* [x] 基础知识
|
||||
* [x] MySQL
|
||||
* [x] Redis
|
||||
* [ ] Redis的总结
|
||||
* [ ] Redis实践
|
||||
* [ ] 消息队列
|
||||
* [x] Redis的总结
|
||||
* [x] Redis实践
|
||||
* [x] 消息队列定义
|
||||
* [x] kafka消息队列
|
||||
* [x] redis实现消息队列
|
||||
* [ ] 计算机网络
|
||||
* [ ] 应用层
|
||||
* [ ] 网络层
|
||||
|
||||
@@ -8,9 +8,9 @@ Redis服务器是一个事件驱动程序,需要处理以下两类事件:
|
||||
|
||||
## 1 文件事件
|
||||
### 原理
|
||||
Redis基于Reactor模式开发了自己的网络事件处理器,称为『文件事件处理器』,文件事件处理器以单线程方式运行。
|
||||
Redis基于Reactor模式开发了自己的网络事件处理器,称为文件事件处理器,文件事件处理器以单线程方式运行。
|
||||
|
||||
Redis 基于 Reactor 模式开发了自己的网络事件处理器,使用 I/O 多路复用程序来同时监听多个套接字,并将到达的事件传送给文件事件分派器,分派器会根据套接字产生的事件类型调用相应的事件处理器。
|
||||
Redis **基于Reactor 模式开发了自己的网络事件处理器**,使用 **I/O 多路复用程序来同时监听多个套接字**,并将到达的事件传送给文件事件分派器,分派器会根据套接字产生的事件类型调用相应的事件处理器。
|
||||
|
||||

|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ Redis 这种内存型数据库的读写性能非常高,很适合存储频繁
|
||||
|
||||
List 是一个双向链表,可以通过 lpush 和 rpop 写入和读取消息
|
||||
|
||||
不过最好使用 Kafka、RabbitMQ 等消息中间件。
|
||||
不过最好使用 Kafka、RabbitMQ等消息中间件。
|
||||
|
||||
### 会话缓存
|
||||
|
||||
|
||||
302
数据库/Redis/16 Redis消息队列.md
Normal file
@@ -0,0 +1,302 @@
|
||||
> 参考文献
|
||||
> * [redis做消息队列](https://www.zhihu.com/question/20795043)
|
||||
|
||||

|
||||
|
||||
|
||||

|
||||
## 1 List队列
|
||||
|
||||
|
||||
List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。如果把 List 当作队列,你可以这么来用。
|
||||
|
||||
### 读非阻塞的消息队列
|
||||
|
||||

|
||||
|
||||
|
||||
* 生产者使用 LPUSH 发布消息:
|
||||
|
||||
```
|
||||
127.0.0.1:6379> LPUSH queue msg1
|
||||
(integer) 1
|
||||
127.0.0.1:6379> LPUSH queue msg2
|
||||
(integer) 2
|
||||
```
|
||||
|
||||
* 消费者这一侧,使用 RPOP 拉取消息:
|
||||
|
||||
```
|
||||
127.0.0.1:6379> RPOP queue
|
||||
"msg1"
|
||||
127.0.0.1:6379> RPOP queue
|
||||
"msg2"
|
||||
```
|
||||
|
||||
|
||||
* 但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。
|
||||
```
|
||||
127.0.0.1:6379> RPOP queue
|
||||
(nil) // 没消息了
|
||||
```
|
||||
|
||||
* 而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:
|
||||
```
|
||||
while true:
|
||||
msg = redis.rpop("queue")
|
||||
// 没有消息,继续循环
|
||||
if msg == null:
|
||||
continue
|
||||
// 处理消息
|
||||
handle(msg)
|
||||
```
|
||||
|
||||
* 如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。怎么解决这个问题呢?
|
||||
* 也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:
|
||||
```
|
||||
while true:
|
||||
msg = redis.rpop("queue")
|
||||
// 没有消息,休眠2s
|
||||
if msg == null:
|
||||
sleep(2)
|
||||
continue
|
||||
// 处理消息
|
||||
handle(msg)
|
||||
```
|
||||
|
||||
* 这就解决了 CPU 空转问题。这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。
|
||||
|
||||
|
||||
### 读阻塞式消息队列
|
||||
|
||||

|
||||
|
||||
* 那如何做,既能及时处理新消息,还能避免 CPU 空转呢?Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?
|
||||
* 幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
|
||||
|
||||
```
|
||||
while true:
|
||||
// 没消息阻塞等待,0表示不设置超时时间
|
||||
msg = redis.brpop("queue", 0)
|
||||
if msg == null:
|
||||
continue
|
||||
// 处理消息
|
||||
handle(msg)
|
||||
```
|
||||
|
||||
解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?我们一起来分析一下:
|
||||
|
||||
* 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
|
||||
* 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了
|
||||
|
||||
|
||||
## 2 发布订阅模型
|
||||
|
||||
它正好可以解决前面提到的第一个问题:重复消费。即多组生产者、消费者的场景,我们来看它是如何做的。Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
|
||||
|
||||
|
||||
### 简单订阅发布机制
|
||||
|
||||

|
||||
|
||||
假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。
|
||||
```
|
||||
// 2个消费者 都订阅一个队列
|
||||
127.0.0.1:6379> SUBSCRIBE queue
|
||||
Reading messages... (press Ctrl-C to quit)
|
||||
1) "subscribe"
|
||||
2) "queue"
|
||||
3) (integer) 1
|
||||
```
|
||||
|
||||
此时,2 个消费者都会被阻塞住,等待新消息的到来。之后,再启动一个生产者,发布一条消息。
|
||||
```
|
||||
127.0.0.1:6379> PUBLISH queue msg1
|
||||
(integer) 1
|
||||
```
|
||||
|
||||
这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。
|
||||
```
|
||||
127.0.0.1:6379> SUBSCRIBE queue
|
||||
// 收到新消息
|
||||
1) "message"
|
||||
2) "queue"
|
||||
3) "msg1"
|
||||
```
|
||||
|
||||
看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。
|
||||
|
||||
### 订阅多个发布者
|
||||
|
||||

|
||||
|
||||
除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。
|
||||
|
||||
```
|
||||
// 订阅符合规则的队列
|
||||
127.0.0.1:6379> PSUBSCRIBE queue.*
|
||||
Reading messages... (press Ctrl-C to quit)
|
||||
1) "psubscribe"
|
||||
2) "queue.*"
|
||||
3) (integer) 1
|
||||
```
|
||||
|
||||
这里的消费者,订阅了 queue.* 相关的队列消息。之后,生产者分别向 queue.p1 和 queue.p2 发布消息。
|
||||
```
|
||||
127.0.0.1:6379> PUBLISH queue.p1 msg1
|
||||
(integer) 1
|
||||
127.0.0.1:6379> PUBLISH queue.p2 msg2
|
||||
(integer) 1
|
||||
```
|
||||
|
||||
这时再看消费者,它就可以接收到这 2 个生产者的消息了。
|
||||
```
|
||||
127.0.0.1:6379> PSUBSCRIBE queue.*
|
||||
Reading messages... (press Ctrl-C to quit)
|
||||
...
|
||||
// 来自queue.p1的消息
|
||||
1) "pmessage"
|
||||
2) "queue.*"
|
||||
3) "queue.p1"
|
||||
4) "msg1"
|
||||
|
||||
// 来自queue.p2的消息
|
||||
1) "pmessage"
|
||||
2) "queue.*"
|
||||
3) "queue.p2"
|
||||
4) "msg2"
|
||||
```
|
||||
|
||||
### 完整的订阅发布流程
|
||||
一个完整的发布、订阅消息处理流程是这样的:
|
||||
|
||||
消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
|
||||
生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它
|
||||
|
||||

|
||||
|
||||
Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!
|
||||
|
||||
|
||||
### Pub/Sub 特点
|
||||
|
||||
1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
|
||||
2. 消费者下线,数据会丢失
|
||||
3. 不支持数据持久化,Redis 宕机,数据也会丢失
|
||||
4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
|
||||
|
||||
### Pub/sub List对比
|
||||
|
||||
List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。
|
||||
|
||||
|
||||
## 3 趋于成熟的队列:Stream
|
||||
|
||||
当我们在使用一个消息队列时,希望它的功能如下:
|
||||
|
||||
1. 支持阻塞等待拉取消息
|
||||
1. 支持发布 / 订阅模式
|
||||
1. 消费失败,可重新消费,消息不丢失
|
||||
1. 实例宕机,消息不丢失,数据可持久化
|
||||
1. 消息可堆积
|
||||
|
||||
### 典型过程
|
||||
|
||||

|
||||
Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:
|
||||
|
||||
* XADD:发布消息
|
||||
* XREAD:读取消息
|
||||
|
||||
|
||||
使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。这个消息 ID 的格式是「时间戳-自增序号」。
|
||||
|
||||
生产者发布 2 条消息:
|
||||
```
|
||||
// *表示让Redis自动生成消息ID
|
||||
127.0.0.1:6379> XADD queue * name zhangsan
|
||||
"1618469123380-0"
|
||||
127.0.0.1:6379> XADD queue * name lisi
|
||||
"1618469127777-0"
|
||||
```
|
||||
|
||||
|
||||
消费者拉取消息:
|
||||
```
|
||||
// 从开头读取5条消息,0-0表示从开头读取
|
||||
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
|
||||
1) 1) "queue"
|
||||
1) 1) 1) "1618469123380-0"
|
||||
1) 1) "name"
|
||||
1) "zhangsan"
|
||||
1) 1) "1618469127777-0"
|
||||
1) 1) "name"
|
||||
1) "lisi"
|
||||
```
|
||||
|
||||
|
||||
如果想继续拉取消息,需要传入上一条消息的 ID:
|
||||
```
|
||||
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
|
||||
(nil)
|
||||
```
|
||||
没有消息,Redis 会返回 NULL。
|
||||
|
||||
|
||||
|
||||
### 其他特性
|
||||
|
||||
1) Stream 是否支持「阻塞式」拉取消息?
|
||||
|
||||
可以的,在读取消息时,只需要增加 BLOCK 参数即可。
|
||||
```
|
||||
// BLOCK 0 表示阻塞等待,不设置超时时间
|
||||
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
|
||||
```
|
||||
这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。
|
||||
|
||||
2) Stream 是否支持发布 / 订阅模式?
|
||||
|
||||
也没问题,Stream 通过以下命令完成发布订阅:
|
||||
```
|
||||
XGROUP:创建消费者组
|
||||
XREADGROUP:在指定消费组下,开启消费者拉取消息
|
||||
```
|
||||
|
||||
3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?
|
||||
|
||||
除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。
|
||||
|
||||
当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。
|
||||
```
|
||||
// group1下的 1618472043089-0 消息已处理完成
|
||||
127.0.0.1:6379> XACK queue group1 1618472043089-0
|
||||
```
|
||||
如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。
|
||||
|
||||
待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。
|
||||
```
|
||||
// 消费者重新上线,0-0表示重新拉取未ACK的消息
|
||||
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
|
||||
// 之前没消费成功的数据,依旧可以重新消费
|
||||
1) 1) "queue"
|
||||
2) 1) 1) "1618472043089-0"
|
||||
2) 1) "name"
|
||||
2) "zhangsan"
|
||||
2) 1) "1618472045158-0"
|
||||
2) 1) "name"
|
||||
2) "lisi"
|
||||
```
|
||||
|
||||
4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?
|
||||
|
||||
Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。
|
||||
|
||||
我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。
|
||||
|
||||
5) 消息堆积时,Stream 是怎么处理的?
|
||||
|
||||
其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:
|
||||
|
||||
* 生产者限流:避免消费者处理不及时,导致持续积压
|
||||
* 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息
|
||||
BIN
数据库/Redis/image/2021-09-03-09-15-05.png
Normal file
|
After Width: | Height: | Size: 142 KiB |
BIN
数据库/Redis/image/2021-09-03-09-18-36.png
Normal file
|
After Width: | Height: | Size: 45 KiB |
BIN
数据库/Redis/image/2021-09-03-09-22-19.png
Normal file
|
After Width: | Height: | Size: 41 KiB |
BIN
数据库/Redis/image/2021-09-03-09-24-24.png
Normal file
|
After Width: | Height: | Size: 66 KiB |
BIN
数据库/Redis/image/2021-09-03-09-26-34.png
Normal file
|
After Width: | Height: | Size: 64 KiB |
BIN
数据库/Redis/image/2021-09-03-09-30-04.png
Normal file
|
After Width: | Height: | Size: 74 KiB |
BIN
数据库/Redis/image/2021-09-03-09-34-18.png
Normal file
|
After Width: | Height: | Size: 39 KiB |
BIN
数据库/Redis/image/2021-09-03-09-35-40.png
Normal file
|
After Width: | Height: | Size: 122 KiB |
5
数据库/事件驱动?.txt
Normal file
@@ -0,0 +1,5 @@
|
||||
事件驱动?
|
||||
事件响应机制?
|
||||
回调机制
|
||||
异步回调机制
|
||||
并发机制:多线程并发?异步并发?异步IO?
|
||||