腾讯云消息队列 RabbitMQ 版 - 使用 SDK 收发消息
文档简介:
操作场景:
本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
前提条件
完成资源创建与准备
安装1.8或以上版本 JDK
安装2.5或以上版本 Maven
下载 Demo
操作步骤
步骤1:安装 Java 依赖库
在 pom.xml 添加以下依赖:
<!-- in your <dependencies> block --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.13.0</version></dependency>
步骤2:生产消息
编译并运行 MessageProducer.java。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.tencent.tdmq.demo.cloud.Constant;/*** 消息生产者*/public class MessageProducer {/*** 交换机名称*/private static final String EXCHANGE_NAME = "exchange_name";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址 (完整复制控制台接入点地址)factory.setUri("amqp://***");// 设置Virtual Hosts (开源 RabbitMQ 控制台复制完整Vhost名称)factory.setVirtualHost(VHOST_NAME);// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)factory.setUsername(USERNAME);// 设置密码 (对应user的密钥)factory.setPassword("****");// 获取连接、建立通道try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)channel.exchangeDeclare(EXCHANGE_NAME, "fanout");for (int i = 0; i < 10; i++) {String message = "this is rabbitmq message " + i;// 发布消息到交换机,交换机自动将消息投递到相应队列channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [producer] Sent '" + message + "'");}} catch (Exception e) {e.printStackTrace();}}}
参数
|
说明
|
EXCHANGE_NAME
|
Exchange 名称,在控制台 Exchange 列表获取。
|
factory.setUri
|
集群接入地址,在集群基本信息页面的客户端接入模块获取。
![]() |
factory.setVirtualHost
|
Vhost 名称,在控制台 Vhost 列表获取。
|
factory.setUsername
|
用户名称,填写在控制台创建的用户名称。
|
factory.setPassword
|
用户密码,填写在控制台创建用户时填写的密码。
|
步骤3:消费消息
编译并运行 MessageConsumer.java。
import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.tencent.tdmq.demo.cloud.Constant;import java.io.IOException;import java.nio.charset.StandardCharsets;/*** 消息消费者*/public class MessageConsumer1 {/*** 队列名称*/public static final String QUEUE_NAME = "queue_name";/*** 交换机名称*/private static final String EXCHANGE_NAME = "exchange_name";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();// 设置服务地址 (完整复制控制台接入点地址)factory.setUri("amqp://***");// 设置Virtual Hosts (开源 RabbitMQ 控制台中复制完整Vhost名称)factory.setVirtualHost(VHOST_NAME);// 设置用户名 (开源 RabbitMQ 控制台中Vhost的配置权限中的user名称)factory.setUsername(USERNAME);// 设置密码 (对应user的密钥)factory.setPassword("****");// 获取连接Connection connection = factory.newConnection();// 建立通道Channel channel = connection.createChannel();// 绑定消息交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 声明队列信息channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 绑定消息交换机 (EXCHANGE_NAME必须在消息队列RabbitMQ版控制台上已存在,并且Exchange的类型与控制台上的类型一致)channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");System.out.println(" [Consumer1] Waiting for messages.");// 订阅消息channel.basicConsume(QUEUE_NAME, false, "ConsumerTag", new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {//接收到的消息,进行业务逻辑处理。System.out.println("Received: " + new String(body, StandardCharsets.UTF_8) + ", deliveryTag: "
+ envelope.getDeliveryTag() + ", messageId: " + properties.getMessageId());
channel.basicAck(envelope.getDeliveryTag(), false);}});}}
参数
|
说明
|
QUEUE_NAME
|
Queue 名称,在控制台 Queue 列表获取。
|
EXCHANGE_NAME
|
Exchange 名称,在控制台 Exchange 列表获取。
|
factory.setUri
|
集群接入地址,在集群基本信息页面的客户端接入模块获取。
![]() |
factory.setVirtualHost
|
Vhost 名称,在控制台 Vhost 列表获取。
|
factory.setUsername
|
用户名称,填写在控制台创建的用户名称。
|
factory.setPassword
|
用户密码,填写在控制台创建用户时填写的密码。
|