上云无忧 > 文档中心 > 天翼云分布式消息服务Kafka实践:通过认证生产与消费加密主题的消息
分布式消息服务Kafka
天翼云分布式消息服务Kafka实践:通过认证生产与消费加密主题的消息

文档简介:
分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。 创建用户并配置用户权限 进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。


创建用户并配置用户权限

进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
 


运行生产者客户端,如下是Java客户端代码示例

注意修改内容(用户、密码、接入地址、主题名称)

private static void testPlainSaslProducer() throws Exception {
     Properties props = new Properties();
     // 填写应用用户密码
     String username="user";
     String password="password";
     //注意!密码需要md5
     password = DigestUtils.md5DigestAsHex(password.getBytes());
     String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +
             "username=\"%s\" password=\"%s\";";
     String jaasCfg = String.format(template, username, password);
     props.put("sasl.mechanism", "SCRAM-SHA-512");
     props.put("sasl.jaas.config",jaasCfg);
     props.put("security.protocol", "SASL_PLAINTEXT");
     // 填写sasl接入地址
     props.put("bootstrap.servers", "!sasl_address!");
     props.put("acks", "0");
     props.put("retries", 3);
     props.put("batch.size", 1684);
     props.put("linger.ms", 100);
     props.put("buffer.memory", 33554432); // buffer空间32M
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     Producer<String, String> producer = new KafkaProducer<String, String>(props);
 
     int index = 0;
     TimeUnit.SECONDS.sleep(2);
     while (true) {
         String dvalue = "hello kafka";
         // 填写主题和消息体内容
         ProducerRecord record = new ProducerRecord<>("topicName", "pps200" + index++, dvalue);
         producer.send(record, new Callback() {
             @Override
             public void onCompletion(RecordMetadata paramRecordMetadata, Exception 
paramException) {
                 if (paramRecordMetadata == null) {
                     System.out.println("paramRecordMetadata is null ");
                     paramException.printStackTrace();
                     return;
                 }
                 System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ",
 partition:" + paramRecordMetadata.partition());
             }
         });
         TimeUnit.SECONDS.sleep(1);
     }
 }

运行消费者客户端,如下是Java客户端代码示例

注意修改内容(用户、密码、接入地址、消费组名称、主题名称)

private static void testPlainSaslConsumer() throws Exception {
     Properties props = new Properties();
     // 填写应用用户密码     String username="user";
     String password="password";
     //注意!密码需要md5
     password = DigestUtils.md5DigestAsHex(password.getBytes());
     String template="org.apache.kafka.common.security.scram.ScramLoginModule required " +
             "username=\"%s\" password=\"%s\";";
     String jaasCfg = String.format(template, username, password);
     // 填写sasl接入地址     props.put("bootstrap.servers", "!sasl_address!");
     // 填写消费组名称     props.put("group.id", "!consumerGroup!");
     props.put("security.protocol", "SASL_PLAINTEXT");
     props.put("sasl.mechanism", "SCRAM-SHA-512");
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("sasl.jaas.config",jaasCfg);
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     // // 填写订阅的topic
     consumer.subscribe(Arrays.asList("topicName"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         System.out.printf("==============>poll size = %d%n", records.count());
         for (ConsumerRecord<String, String> record : records) {
             System.out.printf("offset = %d, key = %s, value = %s partition = %s%n",
 record.offset(), record.key(), record.value(), record.partition());
         }
         consumer.commitAsync();//手动提交进度         try {
             Thread.sleep(2000);
         } catch (InterruptedException e) {
             e.printStackTrace();
         }
 
     }
 }


相似文档
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部