MQ

消息队列

消息队列相关知识01

Posted by ZBX on May 5, 2020

RabbitMQ

优点:

  • 易用的管理界面

  • 灵活的路由
  • 高可用性
  • 插件机制

概念

  1. Producer(生产者) 和 Consumer(消费者)

  2. Exchange(交换器) : 把我们的消息分配到对应的 Queue(消息队列) 中
    • 生产者将消息发给交换器的时候,一般会指定一个 RoutingKey(路由键),用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
    • 生产者将消息发送给交换器时,需要一个RoutingKey,当 BindingKey 和 RoutingKey 相匹配时,消息会被路由到对应的队列中。
    • fanout (发到所有与Exchange绑定的Queue中)、direct (路由到那Bindingkey 与 RoutingKey 完全匹配的 Queue 中)、topic ( 与direct类似,可以模糊匹配) 、 headers (根据发送的消息内容中的 headers 属性进行匹配, 不推荐)
  3. Broker(消息中间件的服务节点)

Kafka

分区策略

分区原因

  1. 方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个 topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了。
  2. 可以提高并发,因为可以以 Partition 为单位读写了。

分区原则

将producer发送的数据封装成ProducerRecord对象。下面是ProducerRecord的成员变量和全参的构造方法:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
	public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {...}
	...
}
  1. 没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到parition值;
  2. 既没有 partition 值又没有 key 值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法。

通过指定key就可以可以获取到其在topic 下的对应的partition,即可以借此实现顺序消费。

数据可靠性保证

partition 收到 producer 发送的数据后,都需要向 producer 发送 ack,如果接收到了就进行下一轮发送,否则重新发送。

ack参数配置

0:producer不等待broker的ack

1: partition 的leader落盘成功后返回ack,

-1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack。

何时发送ack?

确保有follower与leader同步完成,leader再发送,这样才能保证即使leader挂掉之后,能在follower中选举出新的leader

多少个follower同步完成之后发送ack?

方案 优点 缺点
半数以上同步 延迟低 选举新leader时,容忍n台节点故障,需要2n+1个副本
全部完成同步 选举新leader时,容忍n台节点故障,需要n+1个副本 延迟高

选择第二种方案,原因如下:

  1. 同样为了容忍 n 台节点的故障,第一种方案需要 2n+1 个副本,而第二种方案只需要 n+1个副本,而 Kafka 的每个分区都有大量的数据,第一种方案会造成大量数据的冗余。
  2. 虽然第二种方案的网络延迟会比较高,但网络延迟对 Kafka 的影响较小。

ISR

采用第二种方案之后,设想以下情景:leader 收到数据,所有 follower 都开始同步数据, 但有一个 follower,因为某种故障,迟迟不能与 leader 进行同步,那 leader 就要一直等下去, 直到它完成同步,才能发送 ack。这个问题怎么解决呢? Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 follower 发送 ack。如果 follower 长时间未向 leader同步数据,则 该 follower 将被踢出ISR,该时间阈值由replica.lag.time.max.ms 参数设定。Leader发送故障之后,就会从ISR中选举新的leader。

如何保证消息队列的高可用

  1. rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式

  2. kafka的高可用性

    1. 多个broker组成,每个broker是一个节点;你创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。
    2. replica副本机制。每个partition的数据都会同步到其他机器上,形成自己的多个replica副本。

如何保证消息不被重复消费

  1. kafka中,每个消息写进去,都有一个offset来代表它的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了。
  2. 需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。

如何保证消息的可靠性传输

RabbitMQ

  1. 生产者丢数据
    1. 如果你要确保说写rabbitmq的消息别丢,可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
  2. rabbitmq弄丢了数据
    1. 开启rabbitmq的持久化
  3. 消费端弄丢了数据
    1. 关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。

Kafka

  1. 生产者通过acks
  2. kafka弄丢了数据
    1. 某个broker宕机,然后重新选举partiton的leader时
  3. 消费端弄丢了数据
    1. 只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。

如何保证消息的顺序性

  • 关于顺序消费的几点说明:
    • Kakfa的顺序消费仅仅是通过Key,将某类消息写入同一个partition,一个partition只能对应一个消费线程,以保证数据有序。

    • 除了发消息需要指定partitionKey外,producer和consumer实例化无区别

    • broker宕机,kafka会有自选择,所以宕机不会减少partition数量,也就不会影响partitionKey的sharding。

  • 怎么保证在一个topic内的多个partition的顺序消费?
    • producer写的时候指定一个key,相同key的数据会分发到同一partition中去,而且这个partition中的数据一定是有顺序的。
    • consumer从partition中取出数据的时候,也一定是有顺序的。
    • consumer中多个线程来并发处理消息,因为单线程太难了,多线程又不能保证顺序消费。
    • 方案: 写 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue;然后对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

有几百万消息持续积压几小时,说说怎么解决

  1. 先修复consumer的问题,确保其恢复消费速度,然后将现有consumer都停掉

  2. 新建一个topic,partition是原来的10倍,临时建立好原先10倍或者20倍的queue数量

  3. 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的10倍数量的queue

  4. 接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据

Kafka高效写数据

  1. 顺序写磁盘
  2. 零拷贝技术
  3. page buffer
  4. nio
  5. partition

Kafka的Consumer Group

消费者组,由多个consumer组成。消费者者内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费,消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

为什么?

首先group是为了提交消息的吞吐量,避免重复消费。

设计消息队列系统

  1. mq得支持可伸缩性
  2. 持久化操作
  3. 高可用
  4. 防止数据的丢失