通过ssl认证生产与消费消息
在控制台下载当前集群ssl相关证书等文件
运行生产者代码:
替换demo代码的交换器、队列、ip、端口、用户名、密码、两个证书路径然后就可以运行了
package com.ctg.rabbitmq.server.test;
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
import javax.net.ssl.KeyManagerFactory;import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManagerFactory;import java.io.FileInputStream;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.KeyStore;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;
public class AMQPProducer {
// 设置交换器,需要在控制台先创建
private final static String EXCHANGE_NAME = "exchangeTest";
// 设置队列名,需要在控制台先创建
private final static String QUEUE_NAME = "helloMQ";
private final static String ROUTING_KEY = "test";
public static void main(String[] args) throws Exception {
// 客户端证书密钥
char[] keyPassphrase = "rabbit".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream("F:\\tmp\\rabbit-client.keycert.p12"), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, keyPassphrase);
// KeyStore密钥
char[] trustPassphrase = "rabbitstore".toCharArray();
KeyStore tks = KeyStore.getInstance("JKS");
tks.load(new FileInputStream("F:\\tmp\\rabbitstore"), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("TLSv1.2");
c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.90.135");
// 输入端口(可在控制台查看)
factory.setPort(5671);
// 输入用户名(可在控制台创建)
factory.setUsername("username");
// 输入密码(可在控制台创建)
factory.setPassword("password");
factory.useSslProtocol(c);
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("vhost");
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 100; i++) {
String message = "Hello rabbitMQ!" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message +"'");
TimeUnit.SECONDS.sleep(1);
}
//关闭频道和连接
channel.close();
connection.close();
}
}
运行消费者代码:
替换demo代码的队列、ip、端口、用户名、密码、两个证书路径然后就可以运行了
package com.ctg.rabbitmq.server.test;
import com.rabbitmq.client.*;
import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;
public class AMQPConsumer {
private final static String QUEUE_NAME = "helloMQ";
public static void main(String[] args) throws Exception {
char[] keyPassphrase = "rabbit".toCharArray();
KeyStore ks = KeyStore.getInstance("PKCS12");
ks.load(new FileInputStream("F:\\tmp\\rabbit-client.keycert.p12"), keyPassphrase);
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
kmf.init(ks, keyPassphrase);
char[] trustPassphrase = "rabbitstore".toCharArray();
KeyStore tks = KeyStore.getInstance("JKS");
tks.load(new FileInputStream("F:\\tmp\\rabbitstore"), trustPassphrase);
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
tmf.init(tks);
SSLContext c = SSLContext.getInstance("TLSv1.2");
c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("192.168.90.135");
// 设置amqp的端口号
factory.setPort(5671);
factory.useSslProtocol(c);
// 设置用户名密码
factory.setUsername("rabbitmq");
factory.setPassword("r@bb!tMQ#3333323");
// 设置Vhost,需要在控制台先创建
factory.setVirtualHost("vhost");
//基于网络环境合理设置超时时间
factory.setConnectionTimeout(30 * 1000);
factory.setHandshakeTimeout(30 * 1000);
factory.setShutdownTimeout(0);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}