天翼云分布式消息服务Kafka实践:通过认证生产与消费加密主题的消息
文档简介:
分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。
创建用户并配置用户权限
进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。
创建用户并配置用户权限
进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
运行生产者客户端,如下是Java客户端代码示例:
注意修改内容(用户、密码、接入地址、主题名称)
private static void testPlainSaslProducer() throws Exception { Properties props = new Properties(); // 填写应用用户密码 String username="user"; String password="password"; //注意!密码需要md5 password = DigestUtils.md5DigestAsHex(password.getBytes()); String template="org.apache.kafka.common.security.scram.ScramLoginModule required " + "username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(template, username, password); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("sasl.jaas.config",jaasCfg); props.put("security.protocol", "SASL_PLAINTEXT"); // 填写sasl接入地址 props.put("bootstrap.servers", "!sasl_address!"); props.put("acks", "0"); props.put("retries", 3); props.put("batch.size", 1684); props.put("linger.ms", 100); props.put("buffer.memory", 33554432); // buffer空间32M props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); int index = 0; TimeUnit.SECONDS.sleep(2); while (true) { String dvalue = "hello kafka"; // 填写主题和消息体内容 ProducerRecord record = new ProducerRecord<>("topicName", "pps200" + index++, dvalue); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata paramRecordMetadata, Exception
paramException) { if (paramRecordMetadata == null) { System.out.println("paramRecordMetadata is null "); paramException.printStackTrace(); return; } System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ",
partition:" + paramRecordMetadata.partition()); } }); TimeUnit.SECONDS.sleep(1); } }
运行消费者客户端,如下是Java客户端代码示例:
注意修改内容(用户、密码、接入地址、消费组名称、主题名称)
private static void testPlainSaslConsumer() throws Exception { Properties props = new Properties(); // 填写应用用户密码 String username="user"; String password="password"; //注意!密码需要md5 password = DigestUtils.md5DigestAsHex(password.getBytes()); String template="org.apache.kafka.common.security.scram.ScramLoginModule required " + "username=\"%s\" password=\"%s\";"; String jaasCfg = String.format(template, username, password); // 填写sasl接入地址 props.put("bootstrap.servers", "!sasl_address!"); // 填写消费组名称 props.put("group.id", "!consumerGroup!"); props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "SCRAM-SHA-512"); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("sasl.jaas.config",jaasCfg); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // // 填写订阅的topic consumer.subscribe(Arrays.asList("topicName")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); System.out.printf("==============>poll size = %d%n", records.count()); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s partition = %s%n",
record.offset(), record.key(), record.value(), record.partition()); } consumer.commitAsync();//手动提交进度 try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }