腾讯云消息队列 RocketMQ - 查询普通消息
文档简介:
当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。
消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。
当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。
消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。
如果您使用的是 5.0 及以上版本的客户端进行消息的生产和消费,则无需在客户端另行开启轨迹开关。
如果您使用的是 4.x 版本的客户端,则需要在客户端来设置开启消息轨迹功能,具体设置示例如下:
DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,// ACL权限new AclClientRPCHook(new SessionCredentials(AK, SK)), true, null);
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(NAMESPACE,groupName,new AclClientRPCHook(new SessionCredentials(AK, SK)),new AllocateMessageQueueAveragely(), true, null);
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(NAMESPACE,groupName,new AclClientRPCHook(new SessionCredentials(AK, SK)));// 设置NameServer的地址pullConsumer.setNamesrvAddr(NAMESERVER);pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);pullConsumer.setAutoCommit(false);pullConsumer.setEnableMsgTrace(true);pullConsumer.setCustomizedTraceTopic(null);
package com.lazycece.sbac.rocketmq.messagemodel;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.MessageModel;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;/*** @author lazycece* @date 2019/8/21*/@Slf4j@Componentpublic class MessageModelConsumer {@Component@RocketMQMessageListener(topic = "topic-message-model",consumerGroup = "message-model-consumer-group",enableMsgTrace = true,messageModel = MessageModel.CLUSTERING)public class ConsumerOne implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {log.info("ConsumerOne: {}", message);}}}
操作场景
当您需要排查以下问题时,就可以使用 TDMQ RocketMQ 版控制台的消息查询功能,按照时间维度或者根据日志中查到的消息 ID 或消息 Key,来查看具体某条消息的消息内容、消息参数和消息轨迹。
查看某条消息的具体内容,具体参数。
查看消息由哪个生产 IP 发送,是否发送成功,消息到服务端的具体时间。
查看消息是否已持久化。
查看消费由哪些消费者消费了,是否消费成功,消息确认消费的具体时间。
需要做分布式系统的性能分析,查看 MQ 对相关消息处理的时延。
操作步骤
1. 登录 RocketMQ 控制台,在左侧导航栏单击消息查询。
2. 在消息查询页面,选择好地域后根据页面提示输入查询条件。
时间范围:选择需要查询的时间范围,支持近100条(默认按时间顺序展示最近的 100 条消息),近30分钟,近1小时,近6小时,近24小时,近3天和自定义时间范围。
集群:选择需要查询的 Topic 所在的集群。
Topic:选择需要查询的 Topic。
查询方式:消息查询功能支持以下查询方式。
查询全部:该方式适合查询全部消息。
按消息 ID 查询:该方式属于精确查询、速度快、精确匹配。
按消息 Key 查询:该方式属于模糊查询,适用于您没有记录消息 ID 但是设置了消息 Key 的场景。
3. 单击查询,下方列表会展示所有查询到的结果并分页展示。


4. 找到您希望查看内容或参数的消息,单击操作列的查看详情,即可查看消息的基本信息、内容(消息体)、详情参数和消费状态。
在消费状态模块,您可以查看到消费该消息的 Group 以及消费状态,同时还可以在操作栏进行如下操作:
重新发送:将消息重新发送到指定的客户端,如消息已消费成功,重新发送该消息可能导致消费重复。
异常诊断:若消费异常,可以查看异常诊断信息。

5. 单击操作列的查看消息轨迹,或者在详情页单击 Tab 栏的消息轨迹,即可查看该消息的消息轨迹(详细说明请参见 消息轨迹查询结果说明)。

消费验证
在查询到某条消息后,您可以单击操作列的消费验证将该条消息发送到指定的客户端来验证该消息。该功能可能会导致消息重复。
说明
消费验证功能仅用于验证客户端的消费逻辑是否正常,并不会影响正常的收消息流程,因此消息的消费状态等信息在消费验证后并不会改变。
消费验证功能目前暂不支持发送到 Python 和 C++ 客户端。
导出消息
在查询到某条消息后,您可以单击操作列的导出消息将该条消息的消息体,消息 Tag,消息 Key,消息生产时间和消费属性等信息。