上云无忧 > 文档中心 > 天翼云分布式消息服务RabbitMQ实践:通过ssl认证生产与消费消息
分布式消息服务RabbitMQ
天翼云分布式消息服务RabbitMQ实践:通过ssl认证生产与消费消息

文档简介:
通过ssl认证生产与消费消息 在控制台下载当前集群ssl相关证书等文件 运行生产者代码: 替换demo代码的交换器、队列、ip、端口、用户名、密码、两个证书路径然后就可以运行了 package com.ctg.rabbitmq.server.test;
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

通过ssl认证生产与消费消息

在控制台下载当前集群ssl相关证书等文件

 

运行生产者代码:
替换demo代码的交换器、队列、ip、端口、用户名、密码、两个证书路径然后就可以运行了

package com.ctg.rabbitmq.server.test;
import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;
import javax.net.ssl.KeyManagerFactory;import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManagerFactory;import java.io.FileInputStream;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.KeyStore;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;

public class AMQPProducer {
    // 设置交换器,需要在控制台先创建
    private final static String EXCHANGE_NAME = "exchangeTest";
    // 设置队列名,需要在控制台先创建
    private final static String QUEUE_NAME = "helloMQ";
    private final static String ROUTING_KEY = "test";

    public static void main(String[] args) throws Exception {
        // 客户端证书密钥
        char[] keyPassphrase = "rabbit".toCharArray();
        KeyStore ks = KeyStore.getInstance("PKCS12");
        ks.load(new FileInputStream("F:\\tmp\\rabbit-client.keycert.p12"), keyPassphrase);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(ks, keyPassphrase);

        // KeyStore密钥
        char[] trustPassphrase = "rabbitstore".toCharArray();
        KeyStore tks = KeyStore.getInstance("JKS");
        tks.load(new FileInputStream("F:\\tmp\\rabbitstore"), trustPassphrase);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.2");
        c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);

        ConnectionFactory factory = new ConnectionFactory();
       
        factory.setHost("192.168.90.135");
        // 输入端口(可在控制台查看)
        factory.setPort(5671);
       
        // 输入用户名(可在控制台创建)
        factory.setUsername("username");
        // 输入密码(可在控制台创建)
        factory.setPassword("password");
        factory.useSslProtocol(c);

        // 设置Vhost,需要在控制台先创建
        factory.setVirtualHost("vhost");

        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 100; i++) {
            String message = "Hello rabbitMQ!" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message +"'");
            TimeUnit.SECONDS.sleep(1);
        }
        //关闭频道和连接
        channel.close();
        connection.close();
    }
}

运行消费者代码:
替换demo代码的队列、ip、端口、用户名、密码、两个证书路径然后就可以运行了

package com.ctg.rabbitmq.server.test;
import com.rabbitmq.client.*;
import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;

public class AMQPConsumer {
    private final static String QUEUE_NAME = "helloMQ";

    public static void main(String[] args) throws Exception {
        char[] keyPassphrase = "rabbit".toCharArray();
        KeyStore ks = KeyStore.getInstance("PKCS12");
        ks.load(new FileInputStream("F:\\tmp\\rabbit-client.keycert.p12"), keyPassphrase);

        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
        kmf.init(ks, keyPassphrase);

        char[] trustPassphrase = "rabbitstore".toCharArray();
        KeyStore tks = KeyStore.getInstance("JKS");
        tks.load(new FileInputStream("F:\\tmp\\rabbitstore"), trustPassphrase);

        TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
        tmf.init(tks);

        SSLContext c = SSLContext.getInstance("TLSv1.2");
        c.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // 设置主机ip
        factory.setHost("192.168.90.135");
        // 设置amqp的端口号
        factory.setPort(5671);
        factory.useSslProtocol(c);
        // 设置用户名密码
        factory.setUsername("rabbitmq");
        factory.setPassword("r@bb!tMQ#3333323");
        // 设置Vhost,需要在控制台先创建
        factory.setVirtualHost("vhost");

        //基于网络环境合理设置超时时间
        factory.setConnectionTimeout(30 * 1000);
        factory.setHandshakeTimeout(30 * 1000);
        factory.setShutdownTimeout(0);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + message + "'");

            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }

}


相似文档
  • Q:无法被路由的消息,去了哪里? A:如果没有任何设置,无法路由的消息会被直接丢弃。 无法路由的情况:Routing key不正确。 解决方案: 1.使用mandatory=true配合ReturnListener,实现消息回发。 2.声明交换机时,指定备份交换机。
  • 天翼云分布式消息服务RabbitMQ产品用户使用指南.pdf
  • 广播消息 在同一个消费组内对所有消费者投递相同消息。 消息回溯 支持根据时间重置消费进度 消息数据自动删除功能 在磁盘满后,在保护期外的数据,能自动删除,保证服务可用性 自动故障切换功能 生产消费自动负载均衡,消息节点故障时自动主备切换,保证服务的连续性。
  • 高吞吐,消息多副本异步复制。 高可靠,消息多副本同步复制。
  • 应用用户管理 多个应用可调用同一个消息服务,通过应用用户,对消息服务下的应用接入权限进行管理。 主题管理 支持对实例下的主题进行管理,执行创建删除等操作。 消费组管理 支持对实例下的消费组进行管理。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部