文档简介:
引入依赖
1.<dependency>
2. <groupId>com.rabbitmq</groupId>
3. <artifactId>amqp-client</artifactId>
4. <version>5.7.0</version>
5.</dependency>
可以通过下载JAR包来引入依赖。
绑定BindingKey
代码示例:
1.import com.rabbitmq.client.BuiltinExchangeType;
2.import com.rabbitmq.client.Channel;
3.import com.rabbitmq.client.Connection;
4.import com.rabbitmq.client.ConnectionFactory;
5.
6.import java.io.IOException;
7.import java.util.concurrent.TimeoutException;
8.
9.public class RabbitmqBindingKey {
10.
11. private final static String EXCHANGE_NAME = "exchangeTest";
12. private final static String QUEUE_NAME = "helloMQ";
13. private final static String ROUTING_KEY = "test";
14.
15. public static void main(String[] args) throws IOException, TimeoutException {
16. // 创建连接工厂
17. ConnectionFactory factory = new ConnectionFactory();
18.
19. // 设置主机ip
20. factory.setHost("192.168.3.113");
21. // 设置amqp的端口号
22. factory.setPort(5672);
23. // 设置用户名密码
24. factory.setUsername("rabbitmq");
25. factory.setPassword("r@bb!tMQ#3333323");
26.
27. // 设置Vhost,需要在控制台先创建
28. factory.setVirtualHost("vhost");
29.
30. //基于网络环境合理设置超时时间
31. factory.setConnectionTimeout(30 * 1000);
32. factory.setHandshakeTimeout(30 * 1000);
33. factory.setShutdownTimeout(0);
34.
35. Connection connection = factory.newConnection();
36. Channel channel = connection.createChannel();
37.
38. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
39.
40. // 创建 。Queue 可以在控制台创建,也可以用API创建
41. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
42.
43. // Queue 与 Exchange进行绑定,注册 BindingKeyTest
44. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
45.
46. connection.close();
47.
48. }
49.}
完成后,可以在实例列表的交换器选项卡和队列选项卡查看结果。
生产消息
代码示例:
1.import com.rabbitmq.client.Channel;
2.import com.rabbitmq.client.Connection;
3.import com.rabbitmq.client.ConnectionFactory;
4.
5.import java.io.IOException;
6.import java.nio.charset.StandardCharsets;
7.import java.util.concurrent.TimeUnit;
8.import java.util.concurrent.TimeoutException;
9.
10.public class RabbitmqProducer {
11.
12. // private final static String EXCHANGE_NAME = "exchangeTest";
13. private final static String QUEUE_NAME = "helloMQ";
14. // private final static String ROUTING_KEY = "test";
15.
16. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
17. // 创建连接工厂
18. ConnectionFactory factory = new ConnectionFactory();
19.
20. // 设置主机ip
21. factory.setHost("192.168.3.113");
22. // 设置amqp的端口号
23. factory.setPort(5672);
24. // 设置用户名密码
25. factory.setUsername("username");
26. factory.setPassword("password");
27.
28.
29. // 设置Vhost,需要在控制台先创建
30. factory.setVirtualHost("test");
31.
32. //基于网络环境合理设置超时时间
33. factory.setConnectionTimeout(30 * 1000);
34. factory.setHandshakeTimeout(30 * 1000);
35. factory.setShutdownTimeout(0);
36.
37. // 创建一个连接
38. Connection connection = factory.newConnection();
39.
40. // 创建一个频道
41. Channel channel = connection.createChannel();
42.
43. // 发送方消息确认
44. // channel.confirmSelect();
45. // 启用发送方事务机制
46. // channel.txSelect();
47. // 指定一个队列
48. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
49.
50. for (int i = 0; i < 100; i++) {
51. // 发送的消息
52. String message = "Hello rabbitMQ!_" + i;
53. // 往队列中发送一条消息,使用默认的交换器
54. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
55. // 使用自定义交换器,需要在管理台预先建好,并设置routing key
56. // channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes(StandardCharsets.UTF_8));
57. System.out.println(" [x] Sent '" + message + "'");
58. TimeUnit.MILLISECONDS.sleep(100);
59. }
60.
61. //关闭频道和连接
62. channel.close();
63. connection.close();
64.
65. }
66.}
消息发送后,可以进入控制台,在实例列表的队列选项卡查看消息发送状态。
消费消息
代码示例:
1.import com.rabbitmq.client.*;
2.
3.import java.io.IOException;
4.import java.nio.charset.StandardCharsets;
5.import java.util.concurrent.TimeoutException;
6.
7.public class RabbitmqConsumer {
8. // 队列名称
9. private final static String QUEUE_NAME = "helloMQ";
10.
11. public static void main(String[] args) throws IOException, TimeoutException {
12. // 创建连接工厂
13. ConnectionFactory factory = new ConnectionFactory();
14. // 设置主机ip
15. factory.setHost("192.168.3.113");
16. // 设置amqp的端口号
17. factory.setPort(5672);
18. // 设置用户名密码
19. factory.setUsername("username");
20. factory.setPassword("password");
21.
22. // 设置Vhost,需要在控制台先创建
23. factory.setVirtualHost("test");
24.
25. //基于网络环境合理设置超时时间
26. factory.setConnectionTimeout(30 * 1000);
27. factory.setHandshakeTimeout(30 * 1000);
28. factory.setShutdownTimeout(0);
29.
30. Connection connection = factory.newConnection();
31. Channel channel = connection.createChannel();
32.
33. //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
34. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
35. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
36.
37. Consumer consumer = new DefaultConsumer(channel) {
38. @Override
39. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
40. String message = new String(body, StandardCharsets.UTF_8);
41. System.out.println(" [x] Received '" + message + "'");
42.
43. }
44. };
45. channel.basicConsume(QUEUE_NAME, true, consumer);
46.
47.
48. }
49.
50.}
完成上述步骤后,可以在控制台查看消费者是否启动成功。
完成以上所有步骤后,就成功接入了RabbitMQ服务,可以用消息队列进行消息发送和订阅了。