上云无忧 > 文档中心 > 天翼云分布式消息服务RabbitMQ编译工程生产消费消息
分布式消息服务RabbitMQ
天翼云分布式消息服务RabbitMQ编译工程生产消费消息

文档简介:
引入依赖 1. 2. com.rabbitmq 3. amqp-client 4. 5.7.0 5. 可以通过下载JAR包来引入依赖。 绑定BindingKey 代码示例: 1.import com.rabbitmq.client.BuiltinExchangeType;
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

引入依赖

1.<dependency>  

2.    <groupId>com.rabbitmq</groupId>  

3.    <artifactId>amqp-client</artifactId>  

4.    <version>5.7.0</version>  

5.</dependency>  

可以通过下载JAR包来引入依赖。


绑定BindingKey

代码示例:

1.import com.rabbitmq.client.BuiltinExchangeType;  

2.import com.rabbitmq.client.Channel;  

3.import com.rabbitmq.client.Connection;  

4.import com.rabbitmq.client.ConnectionFactory;  

5.  

6.import java.io.IOException;  

7.import java.util.concurrent.TimeoutException;  

8.  

9.public class RabbitmqBindingKey {  

10.  

11.    private final static String EXCHANGE_NAME = "exchangeTest";  

12.    private final static String QUEUE_NAME = "helloMQ";  

13.    private final static String ROUTING_KEY = "test";  

14.  

15.    public static void main(String[] args) throws IOException, TimeoutException {  

16.        // 创建连接工厂  

17.        ConnectionFactory factory = new ConnectionFactory();  

18.  

19.        // 设置主机ip  

20.        factory.setHost("192.168.3.113");  

21.        // 设置amqp的端口号  

22.        factory.setPort(5672);  

23.        // 设置用户名密码  

24.        factory.setUsername("rabbitmq");  

25.        factory.setPassword("r@bb!tMQ#3333323");  

26.  

27.        // 设置Vhost,需要在控制台先创建  

28.        factory.setVirtualHost("vhost");  

29.  

30.        //基于网络环境合理设置超时时间  

31.        factory.setConnectionTimeout(30 * 1000);  

32.        factory.setHandshakeTimeout(30 * 1000);  

33.        factory.setShutdownTimeout(0);  

34.  

35.        Connection connection = factory.newConnection();  

36.        Channel channel = connection.createChannel();  

37.  

38.        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);  

39.  

40.        // 创建 。Queue 可以在控制台创建,也可以用API创建  

41.        channel.queueDeclare(QUEUE_NAME, true, false, false, null);  

42.  

43.        // Queue 与 Exchange进行绑定,注册 BindingKeyTest  

44.        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);  

45.  

46.        connection.close();  

47.  

48.    }  

49.}  

完成后,可以在实例列表的交换器选项卡和队列选项卡查看结果。


生产消息

代码示例:

1.import com.rabbitmq.client.Channel;  

2.import com.rabbitmq.client.Connection;  

3.import com.rabbitmq.client.ConnectionFactory;  

4.  

5.import java.io.IOException;  

6.import java.nio.charset.StandardCharsets;  

7.import java.util.concurrent.TimeUnit;  

8.import java.util.concurrent.TimeoutException;  

9.  

10.public class RabbitmqProducer {  

11.  

12.    // private final static String EXCHANGE_NAME = "exchangeTest";  

13.    private final static String QUEUE_NAME = "helloMQ";  

14.    // private final static String ROUTING_KEY = "test";  

15.  

16.    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {  

17.        // 创建连接工厂  

18.        ConnectionFactory factory = new ConnectionFactory();  

19.  

20.        // 设置主机ip  

21.        factory.setHost("192.168.3.113");  

22.        // 设置amqp的端口号  

23.        factory.setPort(5672);  

24.        // 设置用户名密码  

25.        factory.setUsername("username");  

26.        factory.setPassword("password");  

27.  

28.  

29.        // 设置Vhost,需要在控制台先创建  

30.        factory.setVirtualHost("test");  

31.  

32.        //基于网络环境合理设置超时时间  

33.        factory.setConnectionTimeout(30 * 1000);  

34.        factory.setHandshakeTimeout(30 * 1000);  

35.        factory.setShutdownTimeout(0);  

36.  

37.        // 创建一个连接  

38.        Connection connection = factory.newConnection();  

39.  

40.        // 创建一个频道  

41.        Channel channel = connection.createChannel();  

42.  

43.        // 发送方消息确认  

44.        // channel.confirmSelect();  

45.        // 启用发送方事务机制  

46.        // channel.txSelect();  

47.        // 指定一个队列  

48.        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  

49.  

50.        for (int i = 0; i < 100; i++) {  

51.            // 发送的消息  

52.            String message = "Hello rabbitMQ!_" + i;  

53.            // 往队列中发送一条消息,使用默认的交换器  

54.            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));  

55.            // 使用自定义交换器,需要在管理台预先建好,并设置routing key  

56.            // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));  

57.            System.out.println(" [x] Sent '" + message + "'");  

58.            TimeUnit.MILLISECONDS.sleep(100);  

59.        }  

60.  

61.        //关闭频道和连接  

62.        channel.close();  

63.        connection.close();  

64.  

65.    }  

66.}  

消息发送后,可以进入控制台,在实例列表的队列选项卡查看消息发送状态。


消费消息

代码示例:

1.import com.rabbitmq.client.*;  

2.  

3.import java.io.IOException;  

4.import java.nio.charset.StandardCharsets;  

5.import java.util.concurrent.TimeoutException;  

6.  

7.public class RabbitmqConsumer {  

8.    // 队列名称  

9.    private final static String QUEUE_NAME = "helloMQ";  

10.  

11.    public static void main(String[] args) throws IOException, TimeoutException {  

12.        // 创建连接工厂  

13.        ConnectionFactory factory = new ConnectionFactory();  

14.        // 设置主机ip  

15.        factory.setHost("192.168.3.113");  

16.        // 设置amqp的端口号  

17.        factory.setPort(5672);  

18.        // 设置用户名密码  

19.        factory.setUsername("username");  

20.        factory.setPassword("password");  

21.  

22.        // 设置Vhost,需要在控制台先创建  

23.        factory.setVirtualHost("test");  

24.  

25.        //基于网络环境合理设置超时时间  

26.        factory.setConnectionTimeout(30 * 1000);  

27.        factory.setHandshakeTimeout(30 * 1000);  

28.        factory.setShutdownTimeout(0);  

29.  

30.        Connection connection = factory.newConnection();  

31.        Channel channel = connection.createChannel();  

32.  

33.        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。  

34.        channel.queueDeclare(QUEUE_NAME, false, false, false, null);  

35.        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  

36.  

37.        Consumer consumer = new DefaultConsumer(channel) {  

38.            @Override  

39.            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  

40.                String message = new String(body, StandardCharsets.UTF_8);  

41.                System.out.println(" [x] Received '" + message + "'");  

42.  

43.            }  

44.        };  

45.        channel.basicConsume(QUEUE_NAME, true, consumer);  

46.  

47.  

48.    }  

49.  

50.}  

完成上述步骤后,可以在控制台查看消费者是否启动成功。

完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。 


相似文档
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部