侧边栏壁纸
  • 累计撰写 15 篇文章
  • 累计创建 8 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

RocketMQ

王富贵
2024-04-11 / 0 评论 / 1 点赞 / 72 阅读 / 0 字

1、RocketMQ

1.1、RocketMQ简介

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会 作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛 的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

github官方

rocketMQ官方文档

理论基础笔记

实战笔记

MQ的优劣势

优势:

  • 应用解耦
  • 异步提速
  • 削峰填谷
  • 分布式事务

劣势:

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题
    • 消息顺序性
    • 消息丢失
    • 消息一致性
    • 消息重复使用

应用场景描述

异步提速

1

分布式事务

29

常用队列对比

2

  1. ActiveMQ

    java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

  2. RabbitMQ

    erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

  3. RocketMQ

    java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强大,扩展性强

  4. kafka

    scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

1.2、RockedMQ核心概念

1.2.1、服务概念

4

Name Server:

主要负责对于源数据的管理,包括了对于Topic和路由信息的管理。

  1. 是一个功能齐全的服务器,其角色类似Dubbo中的Zookeeper,但NameServer与Zookeeper相比更轻量。主要是因为每个NameServer节点互相之间是独立的,没有任何信息交互。
  2. 压力不会太大,平时主要开销是在维持心跳和提供Topic-Broker的关系数据。但有一点需要注意,Broker向NameServer发心跳时, 会带上当前自己所负责的所有Topic信息,如果Topic个数太多(万级别),会导致一次心跳中,就Topic的数据就几十M,网络情况差的话, 网络传输失败,心跳失败,导致NameServer误认为Broker心跳失败。
  3. 被设计成几乎无状态的,可以横向扩展,节点之间相互之间无通信,通过部署多台机器来标记自己是一个伪集群。

每个 Broker 在启动的时候会到 NameServer 注册,Producer 在发送消息前会根据 Topic 到 NameServer 获取到 Broker 的路由信息,Consumer 也会定时获取 Topic 的路由信息。

所以从功能上看NameServer应该是和 ZooKeeper 差不多,据说 RocketMQ 的早期版本确实是使用的 ZooKeeper ,后来改为了自己实现的 NameServer 。

Broker

消息中转角色,负责存储消息,转发消息。

  1. Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,顺带一提底层的通信和连接都是基于Netty实现的。
  2. Broker负责消息存储,以Topic为纬度支持轻量级的队列,单机可以支撑上万队列规模,支持消息推拉模型。
  3. 官网上有数据显示:具有上亿级消息堆积能力,同时可严格保证消息的有序性

生产者

消息生产者,负责产生消息,一般由业务系统负责产生消息。

  1. 由用户进行分布式部署,消息由生产者通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
  2. RocketMQ 提供了三种方式发送消息:同步、异步和单向
    1. 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
    2. 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
    3. 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集。

消费者

消息消费者,负责消费消息,一般是后台系统负责异步消费。

  1. Consumer也由用户部署,支持PUSH和PULL两种消费模式,支持集群消费广播消息,提供实时的消息订阅机制
  2. Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型。
  3. Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息。

1.2.2、消息领域名词

Message(消息)

就是要传输的信息

一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。

一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。

Topic(主题)

  1. 可以看做消息的规类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息、物流消息等,一条消息必须有一个 Topic 。
  2. 与生产者和消费者的关系非常松散,一个 Topic 可以有0个、1个、多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
  3. 一个 Topic 也可以被 0个、1个、多个消费者订阅。

Tag(标签)

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag

标签有助于保持您的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

Group(组)

分组,一个组可以订阅多个Topic。

分为ProducerGroup,ConsumerGroup,代表某一类的生产者和消费者,一般来说同一个服务可以作为Group,同一个Group一般来说发送和消费的消息都是一样的

Message Queue(消息队列)

主题被划分为一个或多个子主题,即消息队列。

一个 Topic 下可以设置多个消息队列,发送消息时执行该消息的 Topic ,RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。

消息的物理管理单位。一个Topic下可以有多个Queue,Queue的引入使得消息的存储可以分布式集群化,具有了水平扩展能力。

消息消费模式

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)。

默认情况下就是集群消费,该模式下一个消费者集群共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。

而广播消费消息会发给消费者组中的每一个消费者进行消费。

消息顺序(Message Order)

Message Order(消息顺序)有两种:Orderly(顺序消费)和Concurrently(并行消费)。

顺序消费表示消息消费的顺序同生产者为每个消息队列发送的顺序一致,所以如果正在处理全局顺序是强制性的场景,需要确保使用的主题只有一个消息队列。

并行消费不再保证消息顺序,消费的最大并行数量受每个消费者客户端指定的线程池限制。

1.2.3、消费模型

7

8

1.3、RocketMQ搭建

下载rocketMQ

搭建RocketMQ需要启动Name Server用于服务发现,启动Broker用于消息接收和发送

1.3.1、linux搭建

docker搭建RocketMQ

1.3.2、win搭建

1.添加环境变量

环境变量的系统变量中,添加一下键值对

变量名:ROCKETMQ_HOME

变量值:MQ解压路径\MQ文件夹名
例:C:\Users\Administrator\Desktop\rocketmq-all-4.6.1-bin-release

2.启动name-server

进入rocketmq/bin,启动name-server,端口9876

start mqnamesrv.cmd

3.启动broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

4.常见错误

RocketMq启动报错错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_301\jre\lib\ext

如果闪退, 删除C:\Users\”当前系统用户名”\store下的所有文件

1.3.3、控制台安装

控制台github源码

1.下载rocketmq源码

git clone https://github.com/apache/rocketmq-dashboard.git

2.进入rocketmq-console工程,编译源码

mvn clean package -Dmaven.test.skip=true

会在target目录下生成jar包

如果出现下载错误,可能跟开了网络代理有关系

3.运行jar包

双击或者命令行运行即可

java -jar rocketmq-dashboard-1.0.1-SNAPSHOT.jar

1.4、消息的发送

1.4.1、基础发送程序

1.4.1.1、一对一

单生产者对单消费者

生产者

public class OneToOne {
    public static void main(String[] args) throws Exception {
        //1.创建一个发送消息的对象producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.设定发送的服务器地址
        producer.setNamesrvAddr("10.70.192.36:9876");
        //3.启动发送服务
        producer.start();
        //4.创建要发送的消息对象,指定topic,指定内容body
        Message msg = new Message("topic1", "hello rocketmq".getBytes("UTF-8"));
        //5.发送消息
        SendResult result = producer.send(msg);
        //6.关闭连接
        producer.shutdown();
    }
}

消费者

public class OneToOne {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收对象consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("10.70.192.36:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1", "*");
        //4.开启监听用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {
                    System.out.println("收到消息:" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动接收消息服务
        consumer.start();
        System.out.println("接收消息服务已开启");
    }
}

1.4.1.2、一对多

一个生产者对应多个消费者。

此消息分为负载均衡模式和广播模式

  1. 负载均衡模式:使用负载均衡的方式发送给某一个订阅的消费者。
    • 代码实现:consumer.setMessageModel(MessageModel.CLUSTERING);
  2. 广播模式:但如果我们是为了做异步加速,可能同一个消息就要发送给所有订阅的模块。
    • 代码实现:consumer.setMessageModel(MessageModel.BROADCASTING);
1.4.1.2.1、生产者
public class OneToMany {
    public static void main(String[] args) throws Exception {
        //1.创建生产者
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.设置服务器
        producer.setNamesrvAddr("10.70.192.36:9876");
        //3.1启动发送的服务
        producer.start();

        ArrayList<SendResult> sendResults = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            //4.创建要发送的消息对象,指定topic,指定内容body
            Message msg = new Message("topic1", ("hello rocketmq" + i).getBytes("UTF-8"));
            //3.2发送消息
            SendResult result = producer.send(msg);
            sendResults.add(result);
        }
        //5.关闭连接
        producer.shutdown();
    }
}
1.4.1.2.2、消费者
1.4.1.2.2.1、负载均衡
public class OneToMany {
    public static void main(String[] args) throws Exception {
        //1.创建一个消费者模型
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {
                    System.out.println("收到消息:"+msg);
                    System.out.println("消息是:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已经开启!");

        //5 不要关闭消费者!
    }
}
1.4.1.2.2.2、广播模式
public class OneToMany广播模式 {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1","*");
        //设置当前消费者的消费模式(广播模式)
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {
                    System.out.println("收到消息:"+msg);
                    System.out.println("消息是:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已经开启!");

        //5 不要关闭消费者!
    }
}

1.4.2、发送消息类型

生产者发送消息的类型分为同步发送,异步发送和单向发送

1.4.2.1、同步发送

同步消息:是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。

调用方法:

DefaultMQProducer.send()

应用:重要的通知消息、短信通知、短信营销系统

1.4.2.2、异步发送

异步发送:是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 消息发送方在发送了一条消息后,不需要等待服务器响应即可返回, 进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。

调用方法:MQ的异步发送,需要用户实现异步发送回调接口(SendCallback)。

应用:异步发送通常被用于对响应时间敏感的业务场景

//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("localhost:9876");
//3.1启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
    Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8"));
    //3.2 同步消息
    //SendResult result = producer.send(msg);
    //System.out.println("返回结果:" + result);

    //异步消息
    producer.send(msg, new SendCallback() {
        //表示成功返回结果
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        //表示发送消息失败
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable);
        }
    });

    System.out.println("消息"+i+"发完了,做业务逻辑去了!");
}
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.关闭连接
producer.shutdown();

1.4.2.3、单向发送

单向(Oneway)发送:发送方只负责发送消息,不等待服务器回应且没有回调函数触发, 即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。

调用方法:

DefaultMQProducer.sendOneway()

应用:单向发送用于要求一定可靠性的场景,例如日志收集

1.4.2.4、三者区别

发送方式发送TPS发送结果反馈可靠性
同步发送不丢失
异步发送很快不丢失
单向发送最快可能丢失

1.4.3、特殊消息发送

1.4.3.1、延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的消息时间

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

1.4.3.2、批量消息

批量发送消息能显著提高传递小消息的性能.

发送批量消息:

List<Message> msgList = new ArrayList<Message>();
        Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
        Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
        Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));

        msgList.add(msg1);
        msgList.add(msg2);
        msgList.add(msg3);


        SendResult result = producer.send(msgList);

在发送批量消息的时候应该注意:

  1. 这些批量消息应该有相同的topic
  2. 相同的waitStoreMsgOK
  3. 不能是延时消息
  4. 消息内容总长度不超过4M

消息内容总长度包含如下:

  • topic(字符串字节数)
  • body (字节数组长度)
  • 消息追加的属性(key与value对应字符串字节数)
  • 日志(固定20字节)

1.4.4、消息过滤

在我们接收消息的时候(消费者),可以通过四种方式过滤接收的消息

  1. 分类过滤(tag)
  2. 语法过滤

1.4.4.1、分类过滤

我们可以通过tag(分类)来过滤接收消息

生产者

我们给消息设置类别为tag2

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

消费者

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

1.4.4.2、语法过滤

在消息中我们可以添加属性。我们可以通过对消息属性的过滤。这也类似于SQL语句

基本语法

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

生产者

//消息追加属性
msg.putUserProperty("name","坏银");
msg.putUserProperty("age", String.valueOf(18));

消费者

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

或者直接cmd中输入

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

我们可以在服务端页面查看

10

1.5、SpringBoot整合

我们可以是用boot整合的template,用来简化我们的发送与接收步骤

导入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.0</version>
</dependency>

1.5.1、生产者

1.配置文件

在配置文件中,我们可以配置生产者或者消费者的一些相关信息,包括服务地址和分组,这样我们就不需要在代码中重复的new出来

rocketmq:
  name-server: localhost:8765 # 服务列表
  producer:
    group: demo_producer # 生产者组

2.发送消息

我们可以通过SpringBoot的官方工具RocketMQTemplate来进行发送, 他帮我们封装了很多方法

@RestController
public class DemoProducers {
    @Autowired
    private RocketMQTemplate template;

    @RequestMapping("/send")
    public String producersMessage() {
        User user = new User("hy", "10086");
        //convertAndSend是转换并发送的意思,我们可以发送任何类型的消息,他都可以帮我们转换
        template.convertAndSend("demo-topic", user);
        //如果消息很复杂,我们也可以发送Message类型的消息
        //Message message = new Message("topic-demo", "tags-demo", "发送了消息".getBytes(StandardCharsets.UTF_8));
        return "success";
    }
}

这里我们的一个核心方法

rocketMQTemplate.convertAndSend()

convertAndSend的意思就是转化并发送,他可以将我们传入的任何类,String、Obj等等,转化成字节码的形式传输,这样就不需要我们手动的将其转化成为字节码的形式了

3.携带标签(tag)

即主题与标签使用冒号隔开

SendResult result = rocketMQTemplate.syncSend("topic1:tag1", paload);

1.5.2、消费者

1.配置服务地址,用于发现rocketmq

rocketmq:
  name-server: 10.80.168.18:9876 # 服务列表

2.配置监听器

@Component //boot启动时加载
@RocketMQMessageListener(consumerGroup = "demo_consumer", topic = "demo-topic") //配置mq的消费组与主题
public class DemoConsumers1 implements RocketMQListener<User> { //实现监听器,泛型定义消息类型
    @Override //实现onMessage方法
    public void onMessage(User user) {
        System.out.println("Consumers1接收消息:" + user.toString());
    }
}

3.启动boot项目,即可加载监听器

1.5.3、其他类型的消息

1.5.3.1、生产者

生产者负责发送消息,我们之前学到发送消息的类型有同步发送,异步发送,单向发送,演示消息和批量消息

1.5.3.1.1、异步发送
@GetMapping("/asyncSend")
public String asyncSend(){
    User user = new User("hy", "10086");
    /**
     * 发送主题
     * 发送内容
     * 实现SendCallback方法
     */
    template.asyncSend("demo-topic", user, new SendCallback() {
        //成功处理
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        //失败处理
        @Override
        public void onException(Throwable throwable) {
            System.out.println("发送失败");
        }
    });
    return "异步消息发送成功";
}
1.5.3.1.2、单向发送
rocketMQTemplate.sendOneWay("topic9",user);
1.5.3.1.3、延时消息
rocketMQTemplate.syncSend("topic9", MessageBuilder.withPayload("test delay").build(),2000,2);
1.5.3.1.4、批量消息
//批量发送
@GetMapping("batchSend")
public String batchSend(){
    List<Message> msgList = new ArrayList<>();
    msgList.add(new Message("topic6", "tag1", "msg1".getBytes()));
    msgList.add(new Message("topic6", "tag1", "msg2".getBytes()));
    msgList.add(new Message("topic6", "tag1", "msg3".getBytes()));
    template.syncSend("topic8",msgList,1000);
    return "批量发送成功" ;
}

1.5.3.2、消费者

在消费者中,我们用到了过滤和消息模式

1.5.3.2.1、消息过滤

我们可以对消息进行类似于sql语句的过滤

@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",selectorExpression = "age>18"
,selectorType= SelectorType.SQL92)
1.5.3.2.2、消息模式

我们可以对消息的接收模式进行设置,分为广播模式和负载均衡模式,默认为负载均衡模式

@RocketMQMessageListener(topic = "topic9",consumerGroup = "group1",messageModel = MessageModel.BROADCASTING)
1.6.3.2.3、消费者总结

值得注意的是,我们消费者最主要的就是@RocketMQMessageListener这个注解。那么很多其他情况我们可以阅读其源码来解决

1.6、消息顺序

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

rocketmq默认每个主题(topic)创建四个队列(queue),可以在配置文件中修改

1.6.1、消息错乱的原因

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

11

如上图所示,如果采用分区有序(队列有序),那么queue0队列的执行就是创建直接到完成,这不符合我们顺序消费的规定。因此,在这种场景下,我们需要实现全局有序

全局有序的效果

12

1.6.1.1、顺序发送实现

如前图所示,我们需要实现全局顺序发送,意味着同一个事务需要发送到相同的队列里面去。也就是指定发送队列(queue)。

那么如何实现同一事务发送同一队列(queue)中呢?

相同事务,我们一般会有一个id,通过此id对队列的总数取模,就可以发送至同一队列中。注意这只是其中的一个算法,我们也可以采用hash等等散列算法来实现进入同一队列。

1.实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class OrderStep {
    private long orderId;
    private String desc;
}

2.生产者生产消息

根据事务id发送到指定队列,用事务id对队列总数取模

//顺序发送
@GetMapping("orderSend")
public String orderSend() {
    List<OrderStep> orderSteps = buildOrders();
    for (OrderStep orderStep : orderSteps) {
        //设置发送的队列
        template.setMessageQueueSelector(new MessageQueueSelector() {
            //实现指定队列
            @Override
            public MessageQueue select(List<MessageQueue> list, Message message, Object obj) {
                long orderId = orderStep.getOrderId();
                long mqIndex = orderId % list.size(); // 订单id对集合取模
                return list.get((int) mqIndex);
            }
        });


        //发送
        template.convertAndSend("order-topic", orderStep);
    }
    return "顺序发送成功";
}

private List<OrderStep> buildOrders() {
    List<OrderStep> orderList = new ArrayList<>();
    orderList.add(new OrderStep(1L, "创建"));
    orderList.add(new OrderStep(2L, "创建"));
    orderList.add(new OrderStep(1L, "付款"));
    orderList.add(new OrderStep(3L, "创建"));
    orderList.add(new OrderStep(2L, "付款"));
    orderList.add(new OrderStep(3L, "付款"));
    orderList.add(new OrderStep(2L, "完成"));
    orderList.add(new OrderStep(1L, "推送"));
    orderList.add(new OrderStep(3L, "完成"));
    orderList.add(new OrderStep(1L, "完成"));

    return orderList;
}

3.消费者消费消息

@Component
@RocketMQMessageListener(consumerGroup = "consumer", topic = "order-topic")
public class 全局有序 implements RocketMQListener<OrderStep> {
    @Override
    public void onMessage(OrderStep message) {
        System.out.println(message.toString());
    }

}

1.6.1.2、提出疑问

在我们使用普通客户端发送的时候,发送已经无序了,但是整和了boot的RocketMQMessageListener消费依然有序,考虑底层是否优化了全局顺序消费。又有可能是测试并发比较小,生产者生产一条消费者马上消费一条,而不存在库存问题而消息顺序错乱。

普通客户端消费如下

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        List<OrderStep> orderList = new Producer().buildOrders();

        //设置消息进入到指定的消息队列中
        for (final OrderStep order : orderList) {
            Message msg = new Message("topic1", order.toString().getBytes());
            //发送时要指定对应的消息队列选择器
            SendResult result = producer.send(msg, new MessageQueueSelector() {
                //设置当前消息发送时使用哪一个消息队列
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    //根据发送的信息不同,选择不同的消息队列
                    //根据id来选择一个消息队列的对象,并返回->id得到int值
                    long orderId = order.getOrderId();
                    long mqIndex = orderId % list.size();
                    return list.get((int) mqIndex);
                }
            }, null);
            System.out.println(result);
        }

        producer.shutdown();
    }

}

1.7、事务消息

抽象模型如下:

13

正常流程如下:

13

  1. 提交状态:允许进入队列,此消息与非事务消息无区别
  2. 回滚状态:不允许进入队列,此消息等同于未发送过
  3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
  4. 注意:事务消息仅与生产者有关,与消费者无关

1.7.1、测试事务消息

public class TransactionMQMessage {
    public static void main(String[] args) throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer("transaction-group");
        producer.setNamesrvAddr("10.80.168.18:9876");

        //业务流程
        producer.setTransactionListener(new TransactionListener() {
            //正常事务过程,需要返回一个事务的状态,是枚举类型
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                /**
                 * 执行事务语句,正常的业务流程
                 * 我们的返回类型是枚举类
                 * 如果成功返回LocalTransactionState.COMMIT_MESSAGE
                 * 如果失败需要回魂,返回LocalTransactionState.ROLLBACK_MESSAGE
                 * 如果没有及时得到消息需要返回返回不明确,LocalTransactionState.UNKNOW
                 */
                try {
                    System.out.println("执行正常过程");

                    String ans = "success";
                    if(ans.equals("success"))
                    return LocalTransactionState.COMMIT_MESSAGE;
                }catch (Exception e){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 事务补偿过程,只有在执行本地事务中
             * 只有在正常流程中出现了不明确的定义(LocalTransactionState.UNKNOW),才会执行
             * 补偿出现的原因:返回确定的消息,MQ并没有收到
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                //执行补偿的语句
                System.out.println("事务补偿");

                //补偿过程,与正常流程的返回类似。如果查询到保存了数据库,则返回成功提交。如果查不到,则提交回滚
                //如果补偿还是不明确(COMMIT_MESSAGE),这样就会去运维人员手动介入,通过日志或者redis等等方式
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();
        String msg = "事务消息完成";
        Message tranMsg = new Message("transaction", msg.getBytes(StandardCharsets.UTF_8));
        //发送消息注意是sendMessageInTransaction
        TransactionSendResult sendResult = producer.sendMessageInTransaction(tranMsg, null);
        System.out.println(sendResult);

        //事务补偿生产者一定要一直启动着,不能关闭
        //producer.shutdown();
    }
}

1.7.2、业务模拟

假设张三要向李四转账一百元,并且两家银行是不同的

14

于是,这就涉及到了分布式事务的问题,张三有张三家的数据库,李四有李四家的数据库,那么我们就可以使用MQ来进行分布式事务的解决

15

张三相关业务:张三请求接口,于是张三的银行开始本地操作,先向MQ发送半消息(李四账户加一百元)。然后进行减扣库存操作。

  • 如果成功,则将半消息发送出去
  • 如果失败,则不发送消息
  • 如果一直没收到消息,则会反向询问消息

这样保证了,如果减扣库存,则消息一定发的出去,如果没有减扣库存,则消息不会发出去影响李四库存

李四相关业务:如果张三成功扣除了一百元,则MQ必然会有消息(持久化),则必然会向李四系统发送增加一百元的消息。如果消费失败,并且轮序消费一直失败,则可以使用死信队列,之后人工处理

1.8、集群高可用

1.8.1、RocketMQ集群分类

  1. 单机
    1. 一个broker提供服务(宕机后服务瘫痪)
  2. 集群
    1. 多个broker提供服务(单机宕机后消息无法及时被消费)
    2. 多个master多个slave
      1. master到slave消息同步方式为同步[有消息立刻同步](较异步方式性能略低,消息无延迟)
      2. master到slave消息同步方式为异步[缓存之后统一同步](较同步方式性能略高,数据略有延迟)

1.8.2、集群特点

16

RocketMQ集群工作流程

  1. 步骤1:NameServer启动,开启监听,等待broker、producer与consumer连接
  2. 步骤2:broker启动,根据配置信息,连接所有的NameServer,并保持长连接
  3. 步骤2补充:如果broker中有现存数据, NameServer将保存topic与broker关系
  4. 步骤3:producer发信息,连接某个NameServer,并建立长连接
  5. 步骤4:producer发消息
    1. 步骤4.1若果topic存在,由NameServer直接分配
    2. 步骤4.2如果topic不存在,由NameServer创建topic与broker关系,并分配
  6. 步骤5:producer在broker的topic选择一个消息队列(从列表中选择)
  7. 步骤6:producer与broker建立长连接,用于发送消息
  8. 步骤7:producer发送消息

comsumer工作流程同producer

1.8.3、集群搭建

17

操作步骤:注意两台机器同时操作

  1. 配置服务器环境:
vim /etc/hosts
# nameserver
192.168.200.129 rocketmq-nameserver1
192.168.200.130 rocketmq-nameserver2
# broker
192.168.200.129 rocketmq-master1
192.168.200.129 rocketmq-slave2
192.168.200.130 rocketmq-master2
192.168.200.130 rocketmq-slave1
  1. 配置完毕后重启网卡,应用配置
systemctl restart network
  1. 关闭防火墙或者开发指定端口对外提供服务
# 关闭防火墙
systemctl stop firewalld.service
# 查看防火墙的状态
firewall-cmd --state
# 禁止firewall开机启动
systemctl disable firewalld.service
  1. 配置服务器环境
vim /etc/profile
#set rocketmq
ROCKETMQ_HOME=/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PATH
  1. 配置完毕后重启网卡,应用配置
source /etc/profile
  1. 将rocketmq解压到/rocketmq

  2. 创建集群服务器的数据存储目录

  3. 注意master与slave如果在同一个虚拟机中部署,需要将存储目录区分开

  4. broker配置样例

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

1.8.4、rocketmq-console集群监控平台搭建

  1. incubator-rocketmq-externals是一个基于rocketmq的基础之上扩展开发的开源项目
  2. 获取地址:https://github.com/apache/rocketmq-externals
  3. rocketmq-console是一款基于java环境开发的(springboot)的管理控制台工具

1.9、队列的高级特性

1.9.1、持久化消息

1.9.1.1、传统数据库存储

如果将消息存在内存当中,其速度虽然很快,但是一旦宕机,数据就可能会丢失。因此,我们可以使用数据库,来持久化数据。

18

  1. 消息生成者发送消息到MQ
  2. MQ收到消息,将消息进行持久化,存储该消息
  3. MQ返回ACK给生产者
  4. MQ push 消息给对应的消费者
  5. 消息消费者返回ACK给MQ
  6. MQ删除消息

注意:

  1. 第⑤步MQ在指定时间内接到消息消费者返回ACK,MQ认定消息消费成功,执行⑥
  2. 第⑤步MQ在指定时间内未接到消息消费者返回ACK,MQ认定消息消费失败,重新执行④⑤⑥

1.9.1.2、存储调优

1.9.1.2.1、自定义文件格式

我们通常使用数据库进行数据持久化,但是我们知道采用数据库的方式多了数据库软件这一层,mysql本质上还是beanlog日志文件存储了。那我们为什么不自定义格式呢?

  1. 数据库
    1. ActiveMQ
    2. 缺点:数据库瓶颈将成为MQ瓶颈
  2. 文件系统
    1. RocketMQ/Kafka/RabbitMQ
    2. 解决方案:采用消息刷盘机制进行数据存储
    3. 缺点:硬盘损坏的问题无法避免

自定义存储文件格式为commitlog

1.9.1.2.2、大文件顺序读写

rocketmq预先向磁盘申请空间,如10g。空间开辟之后,再使用顺序读写的方式存消息。这样避免了接收消息,再存盘照常的随机读写。

磁盘空间大小,可以在配置文件中修改

#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
1.9.1.2.3、零拷贝技术
  1. 数据传输由传统的4次复制简化成3次复制,减少1次复制过程
  2. Java语言中使用MappedByteBuffer类实现了该技术
  3. 要求:预留存储空间,用于保存数据(1G存储空间起步)

19

在传统linux操作系统中,每个用户文件需要做隔离,因此中间有一层用户态用作隔离,这里零拷贝技术,也就跳过了这一层节省了时间。

1.9.2、消息存储结构

  1. MQ数据存储区域包含如下内容
    1. 消息数据存储区域(存储消息实体数据)
      1. topic
      2. queueId
      3. message
    2. 消费逻辑队列(offset偏移量,用于标注当前消费消息id,保证顺序消费)
      1. minOffset
      2. maxOffset
      3. consumerOffset
    3. 索引
      1. key索引
      2. 创建时间索引

20

1.9.3、刷盘机制

刷盘机制,也就是消息队列将数据持久化的过程

rocketmq有两种模式,同步刷盘与异步刷盘,我们可以将它理解为redis的aof的集中持久化方式。

1.9.3.1、同步刷盘

同步刷盘

  1. 生产者发送消息到MQ,MQ接到消息数据
  2. MQ挂起生产者发送消息的线程
  3. MQ将消息数据写入内存
  4. 内存数据写入硬盘
  5. 磁盘存储后返回SUCCESS
  6. MQ恢复挂起的生产者线程
  7. 发送ACK到生产者

21

每一条消息进来,都会阻塞线程,等待持久化之后才开放

1.9.3.2、异步刷盘

  1. 生产者发送消息到MQ,MQ接到消息数据
  2. MQ将消息数据写入内存
  3. 发送ACK到生产者

23

1.9.3.3、刷盘机制总结

  1. 同步刷盘:安全性高,效率低,速度慢(适用于对数据安全要求较高的业务)
  2. 异步刷盘:安全性低,效率高,速度快(适用于对数据处理速度要求较高的业务)

配置方式

#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH

1.9.4、高可用架构设计

  1. nameserver
    • 无状态+全服务器注册
  2. 消息服务器
    • 主从架构(2M-2S)
  3. 消息生产
    • 生产者将相同的topic绑定到多个group组,保障master挂掉后,其他master仍可正常进行消 息接收
  4. 消息消费
    • RocketMQ自身会根据master的压力确认是否由master承担消息读取的功能,当master繁忙 时候,自动切换由slave承担数据读取的工作

1.9.5、集群的主从复制

  1. 同步复制

    1. master接到消息后,先复制到slave,然后反馈给生产者写操作成功
    2. 优点:数据安全,不丢数据,出现故障容易恢复
    3. 缺点:影响数据吞吐量,整体性能低
  2. 异步复制

    1. master接到消息后,立即返回给生产者写操作成功,当消息达到一定量后再异步复制到slave
    2. 优点:数据吞吐量大,操作延迟低,性能高
    3. 缺点:数据不安全,会出现数据丢失的现象,一旦master出现故障,从上次数据同步到故障时间的数据将丢失
  3. 配置方式

#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER

1.9.6、负载均衡

我们知道,负载均衡分为客户端负载均衡和服务端负载均衡。而在我们rocketmq中,就有客户端的负载均衡。分为生产者负载均衡,消费者负载均衡。

  1. 生产者(Producer)负载均衡
    1. 内部实现了不同broker集群中对同一topic对应消息队列的负载均衡
  2. 消费者(Consumer)负载均衡
    1. 平均分配
    2. 循环平均分配
  3. 广播模式(不参与负载均衡)

1.9.6.1、生产者负载均衡

生产者负载均衡如下图所示。

在同一个rocketmq集群内,不同的broker集群对同一topic对应都会建立队列,通过负载均衡算法进行负载。

25

1.9.6.2、消费者负载均衡

消费者负载均衡为平均分配和循环平均分配

  1. 平均分配:将队列平均的分配给消费者集群
  2. 循环平均分配:每个broker都会分配一条队列给消费者(解决宕机无消费问题)

应该注意的是,消费的广播模式,不存在负载均衡,所有消费者都应该得到和消费消息

平均分配如下图所示

26

循环平均消费如下图所示

对比平均分配可以发现,当平均分配模式的一个broker宕机的时候,则对应的消费者一条消息都收不到。而不同于平均消费而言,循环平均消费能够在broker宕机的时候同样起负载均衡的作用

27

1.9.7、消息重试机制

我们知道,消息分为顺序消息和无序消息

1.9.7.1、顺序消息重试

为了保证消息的顺序性,当出现消费失败时,我们会阻塞后续消息进行重发。而默认重发机制是每秒钟重试一次。

应该注意的是,因为会阻塞后续消息,我们必须监控顺序消费队列

1.9.7.2、无序消费重试

  1. 无序消息包括普通消息、定时消息、延时消息、事务消息
  2. 无序消息重试仅适用于负载均衡(集群)模型下的消息消费,不适用于广播模式下的消息消费
  3. 为保障无序消息的消费,MQ设定了合理的消息重试间隔时长

无序消费重试如下,默认会重试16次,每次时长不同,总时长4小时45分钟40秒

28

1.9.8、死信队列

  1. 当消息消费重试到达了指定次数(默认16次)后,MQ将无法被正常消费的消息称为死信消息(Dead-Letter Message)
  2. 死信消息不会被直接抛弃,而是保存到了一个全新的队列中,该队列称为死信队列(Dead-Letter Queue)
  3. 死信队列特征
    1. 归属某一个组(Gourp Id),而不归属Topic,也不归属消费者
    2. 一个死信队列中可以包含同一个组下的多个Topic中的死信消息
    3. 死信队列不会进行默认初始化,当第一个死信出现后,此队列首次初始化
  4. 死信队列中消息特征
    1. 不会被再次重复消费
    2. 死信队列中的消息有效期为3天,达到时限后将被清除

如何处理死信队列消息:

在监控平台中,通过查找死信,获取死信的messageId,然后通过id对死信进行精准消费。通常出现问题都需要运维人员手动处理

1.9.9、重复消费

在消息队列当中,会出现一个很常见的问题,就是消息接收或者发送了两条,这就造成了重复的消息消费。通常我们也将这个问题称之为幂等性问题。

  1. 消息重复消费原因
    1. 生产者发送了重复的消息
      1. 网络闪断
      2. 生产者宕机
    2. 消息服务器投递了重复的消息
      1. 网络闪断
    3. 动态的负载均衡过程
      1. 网络闪断/抖动
      2. broker重启
      3. 订阅方应用重启(消费者)
      4. 客户端扩容
      5. 客户端缩容

如同下单操作,如果发送了两个支付成功的消息,则会出现重发的事情,这是不被允许的。

1.9.9.1、消费幂等性问题

消息幂等性:对同一条消息,无论消费多少次,结果保持一致,称为消息幂等性

解决方案

  1. 使用业务id作为消息的key
  2. 在消费消息时,客户端对key做判定,未使用过放行,使用过抛弃

注意:messageId由RocketMQ产生,messageId并不具有唯一性,不能作用幂等判定条件

常见的幂等方法示例

执行多次之后,结果发送改变,则不幂等。在接口设计的时候也需要注意

  • 新增:不幂等 insert into order values (……)
  • 查询:幂等
  • 删除:幂等 delete from 表 where id =1
  • 修改:不幂等 update account set balance = balance+100 where no=1
  • 修改:幂等 update account set balance =100 where no=1
1

评论区