初学 RocketMQ 之消息堆积

TestWhite
TestWhite
订阅者
258
文章
0
粉丝
测试交流1 294字数 952阅读3分10秒阅读模式

上次我们简单的模拟了消费乱序的问题,今天我们聊聊在 RokcetMQ 中,关于消息堆积的问题

接下来我们聊聊消息堆积文章源自玩技e族-https://www.playezu.com/188669.html

什么是消息堆积 ?

字面意思:堆积,就是把事物堆积成堆。这里指的就是消息堆积在一起,一直没有被消费或消费的很慢。文章源自玩技e族-https://www.playezu.com/188669.html

消息存储在哪?

消息一般会存在 Broker 服务里面。这里拿我自己搭建的环境,我们来看看(因为我没做好文件映射关系,所以直接进去容器看),一般消息都会放在/${ROCKETMQ_HOME}$/store/里面,实体消息放在 commitLog 文件,consumequeue 是存放消息索引的。这个涉及到消息索引和持久化部分就不具体说明。

初学 RocketMQ 之消息堆积插图
文章源自玩技e族-https://www.playezu.com/188669.html

为什么会堆积?

先说结论,首先消息的生命周期简单来说是 “生产 - 消费” 这样的过程。一般来说生产者不会是消息堆积的诱因(感觉不一定,可以试验一下)。产生消堆积的原因一般是消费速度赶不上生产速度所引起的,可能主要有以下两种类型的代码(希望有更多小伙伴补充一下真实的场景):文章源自玩技e族-https://www.playezu.com/188669.html

  • CPU 计算代码
  • 外部 I/O 操作代码

下面就来模拟一下场景和看看堆积的现象

首先我们要有 “生产者” 和 “消费者” 的角色,这里用的是 spring-boot-starter 快速搭建环境和模拟的文章源自玩技e族-https://www.playezu.com/188669.html

生产者文章源自玩技e族-https://www.playezu.com/188669.html

@Component
public class ReTrySender {
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 普通消息发送
public void delayOrdinarySend() {
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
DefaultMQProducer producer = rocketMQTemplate.getProducer();
producer.setRetryTimesWhenSendFailed(3);
String uuid = UUID.randomUUID().toString().replace("-", "");
String msg = "order" + uuid;
String key = "test";
Message<String> buildMsg = MessageBuilder.withPayload(msg).setHeader("KEYS", key).build();
SendResult sendResult = rocketMQTemplate.syncSend(
MqUntilSecond.tag_topic + ":" + MqUntilSecond.tag, buildMsg);
System.out.println("生产消息:"+ msg + ":" + dateString);
}
}

消费者文章源自玩技e族-https://www.playezu.com/188669.html

@Component
@RocketMQMessageListener(topic = MqUntilSecond.tag_topic,
consumerGroup = MqUntilSecond.consumerGroup,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = MqUntilSecond.maxThread,
consumeMode = ConsumeMode.CONCURRENTLY)
public class ConsumerTag1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
@Override
public void onMessage(String message) {
int i = 1;
while (i < 10000){
try {
i ++;
Thread.sleep(1);
}catch (Exception e){
e.printStackTrace();
}
}
Date currentTime = new Date();
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateString = formatter.format(currentTime);
System.out.println("消费消息:" + message + ":" +  dateString);
}
@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeTimeout(10);
consumer.setMaxReconsumeTimes(2);
}
}

消费者配置文章源自玩技e族-https://www.playezu.com/188669.html

public class MqUntilSecond {
public static final String tag_topic = "tag_topic1"; // 主题
public static final String tag = "tag1";  // 标签
public static final String consumerGroup = "syncGroup"; // 消费集群
public static final int maxThread = 1;  // 最大消费线程数
}

实验部分文章源自玩技e族-https://www.playezu.com/188669.html

  • 在消费者代码部分,我用了循环和线程休眠的方式来模拟一些耗时循环或长链路的场景,我们拿 Jmeter 对接口发起 1000 个请求来看看现象
    初学 RocketMQ 之消息堆积插图1
  • 我们看看 Console 控制台部分,可以看到消费者表单里,syncGroup 的 Delay 字段为 999
    初学 RocketMQ 之消息堆积插图2
  • 我们来看看控制台,这里的消息一直被慢慢的消费,现象是越靠后的消息,生产到消费的时间间隔越大。试想,我拿个系统验证码,一直都收不到那我是不是直接就不用了
    初学 RocketMQ 之消息堆积插图3
    初学 RocketMQ 之消息堆积插图4
  • 我们把这段循环代码去掉,重启服务,再看看现象。消息在一瞬间被处理完毕
    初学 RocketMQ 之消息堆积插图5

总结文章源自玩技e族-https://www.playezu.com/188669.html

  • 首先可以确定的是代码设计不规范会引起消息堆积
  • 但消息堆积不仅仅是因为代码原因,也有可能是业务本身就存在消费赶不上生产,这个时候需要寻求的是其它解决办法
  • 除了代码,还需要关心配置部分。比如:可以调整生产者和消费者的配置,通过梳理业务和调整配置方式来找到系统最优的性能点
  • 实际业务中,如果压测会涉及到消息队列中间件,需要对它进行监控
语音功能测试软件
 
    • ganci
      ganci 9

      根据个人经验,很多代码设计导致的消息积压都是因为 ack,比如由于某些边界情况未手动 ack ,或者消费重试没手动 ack 导致消息堵塞,又或者重复消费过多或者没有超时丢弃而导致的无效执行。 学习了学习了这里其实也可以作为一个技术方案的评审点,对于实时性要求高的,直接用实时通讯方案(如 http )比消息队列要合适。

      个人理解,消息队列本身作用之一,就是可以作为压力的缓冲,让上层短时间高流量以消息积压的方式存在消息队列里,避免下游系统也受到这么高压力一下子被击垮。所以消息堆积只要控制在一定范围内,是可以接受的。然后队列里的积压消息数需要作为一个预警项,当积压数达到一定程度时,人工去确认原因,然后采用适当的手段(如对消费者进行扩容增加消费速度、生产者相关功能暂时关闭或降级)进行处理。嗯嗯,设计评审这部分确实很重要。除了能够测好,有能力在评审阶段提出问题和风险点是很重要的能力。
      仅楼主可见
      仅楼主可见
      仅楼主可见我这个不是性能测试平台呢,只是使用 docker 搭建了一个 rocketMQ 的环境。然后用 Spring-Boot 搭了一个小的服务。写了一个生产者和消费者,再简单的做了个接口。Delay 字段的话,rocketMQ 本身也支持控制台看的。网上搜一下 RocketMQ 环境搭建就好了。

    匿名

    发表评论

    匿名网友
    确定

    拖动滑块以完成验证