RabbitMQ
优点:
-
易用的管理界面
- 灵活的路由
- 高可用性
- 插件机制
概念
-
Producer(生产者) 和 Consumer(消费者)
- Exchange(交换器) : 把我们的消息分配到对应的 Queue(消息队列) 中
- 生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
- 生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。
- fanout (发到所有与Exchange绑定的Queue中)、direct (路由到那Bindingkey 与 RoutingKey 完全匹配的 Queue 中)、topic ( 与direct类似,可以模糊匹配) 、 headers (根据发送的消息内容中的 headers 属性进行匹配, 不推荐)
- Broker(消息中间件的服务节点)
Kafka
分区策略
分区原因
- 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了。
- 可以提高并发,因为可以以 Partition 为单位读写了。
分区原则
将producer发送的数据封装成ProducerRecord对象。下面是ProducerRecord的成员变量和全参的构造方法:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {...}
...
}
- 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到parition值;
- 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。
通过指定key就可以可以获取到其在topic 下的对应的partition,即可以借此实现顺序消费。
数据可靠性保证
partition 收到 producer 发送的数据后,都需要向 producer 发送 ack,如果接收到了就进行下一轮发送,否则重新发送。
ack参数配置
0:producer不等待broker的ack
1: partition 的leader落盘成功后返回ack,
-1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。
何时发送ack?
确保有follower与leader同步完成,leader再发送,这样才能保证即使leader挂掉之后,能在follower中选举出新的leader
多少个follower同步完成之后发送ack?
| 方案 | 优点 | 缺点 |
|---|---|---|
| 半数以上同步 | 延迟低 | 选举新leader时,容忍n台节点故障,需要2n+1个副本 |
| 全部完成同步 | 选举新leader时,容忍n台节点故障,需要n+1个副本 | 延迟高 |
选择第二种方案,原因如下:
- 同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
- 虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。
ISR
采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据, 但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去, 直到它完成同步,才能发送 ack。这个问题怎么解决呢? Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间未向 leader同步数据,则 该 follower 将被踢出ISR,该时间阈值由replica.lag.time.max.ms 参数设定。Leader发送故障之后,就会从ISR中选举新的leader。
如何保证消息队列的高可用
-
rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式
-
kafka的高可用性
- 多个broker组成,每个broker是一个节点;你创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。
- replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。
如何保证消息不被重复消费
- kafka中,每个消息写进去,都有一个offset来代表它的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。
- 需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。
如何保证消息的可靠性传输
RabbitMQ
- 生产者丢数据
- 如果你要确保说写rabbitmq的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
- rabbitmq弄丢了数据
- 开启rabbitmq的持久化
- 消费端弄丢了数据
- 关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。
Kafka
- 生产者通过acks
- kafka弄丢了数据
- 某个broker宕机,然后重新选举partiton的leader时
- 消费端弄丢了数据
- 只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。
如何保证消息的顺序性
- 关于顺序消费的几点说明:
-
Kakfa的顺序消费仅仅是通过Key,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。
-
除了发消息需要指定partitionKey外,producer和consumer实例化无区别
-
broker宕机,kafka会有自选择,所以宕机不会减少partition数量,也就不会影响partitionKey的sharding。
-
- 怎么保证在一个topic内的多个partition的顺序消费?
- producer写的时候指定一个key,相同key的数据会分发到同一partition中去,而且这个partition中的数据一定是有顺序的。
- consumer从partition中取出数据的时候,也一定是有顺序的。
- consumer中多个线程来并发处理消息,因为单线程太难了,多线程又不能保证顺序消费。
- 方案: 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。
有几百万消息持续积压几小时,说说怎么解决
-
先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉
-
新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量
-
然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue
-
接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
Kafka高效写数据
- 顺序写磁盘
- 零拷贝技术
- page buffer
- nio
- partition
Kafka的Consumer Group
消费者组,由多个consumer组成。消费者者内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
为什么?
首先group是为了提交消息的吞吐量,避免重复消费。
设计消息队列系统
- mq得支持可伸缩性
- 持久化操作
- 高可用
- 防止数据的丢失