概述
本文介绍在完成消息服务 for Kafka专享版集群创建后,如何通过Kafka命令行访问Kafka集群进行消息的发布及订阅。
其它SDK访问方式请参考SDK文档。
前提条件
- 已完成百度智能云账号注册
- 已创建Kafka集群
- 已完成相应用户及权限的配置
依赖
- 下载Kafka包 kafka_2.13-2.7.2.tgz (解压后 bin 目录下是可执行脚本)
- 具体接入点信息可以在“集群详情-网络安全-接入点查看”页面查看。
具体场景
场景一:无需认证直接通过接入点访问
认证方式选择 None(无需身份认证),且在同 VPC 网络下访问,可以使用 PLAINTEXT 协议接入。
创建 topic
kafka-topics.sh --create --bootstrap-server <接入点地址> --replication-factor 2 --partitions 3 --topic
删除 topic
kafka-topics.sh --delete --bootstrap-server <接入点地址> --topic
查看 topic 列表
kafka-topics.sh --list --bootstrap-server <接入点地址>
查看 topic 详情
kafka-topics.sh --describe --bootstrap-server <接入点地址> --topic
生产者
kafka-console-producer.sh --bootstrap-server <接入点地址> --topic
消费者
kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic --from-beginning
场景二:SASL_PLAINTEXT 接入点访问
在同 VPC 网络下访问且已选择SASL/SCRAM身份认证方式,可使用 SASL_PLAINTEXT 协议接入。
JAAS 配置文件
创建 jaas 配置文件 kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule sufficient
username="alice"
password="alice1234!";
};
注:username,password 填入创建用户时设置的用户名及密码。
配置文件:kafka.properties
提供接入 Kafka 服务需要的配置信息,配置项如下:
配置项 | 配置内容 |
---|---|
bootstrap.servers | 接入点地址 |
security.protocol | SASL_PLAINTEXT |
sasl.mechanism | SCRAM-SHA-512 |
bootstrap.servers=<接入点地址>
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
生产者
# 指定 kafka_client_jaas.conf 文件位置
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'
kafka-console-producer.sh --bootstrap-server <接入点地址> --topic <topic> --producer.config kafka.properties
消费者
# 指定 kafka_client_jaas.conf 文件位置
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'
kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic <topic> --consumer.config kafka.properties --from-beginning
场景三:SASL_SSL 接入点访问
在VPC网络外访问且已选择SASL/SCRAM身份认证方式,可使用SASL_SSL协议接入。
JAAS 配置文件
创建 jaas 配置文件 kafka_client_jaas.conf
KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule sufficient
username="alice"
password="alice1234!";
};
注:username,password 填入创建用户时设置的用户名及密码。
证书文件:client.truststore.jks
下载证书文件: client.truststore.jks
配置文件:kafka.properties
提供接入 Kafka 服务需要的配置信息,配置项如下:
配置项 | 配置内容 |
---|---|
bootstrap.servers | 接入点地址 |
security.protocol | SASL_SSL |
ssl.truststore.location |
配置为 client.truststore.jks 文件路径 例如:/etc/kafka/ssl/client.truststore.jks |
ssl.truststore.password | bms@kafka |
ssl.endpoint.identification.algorithm | 固定为空 |
sasl.mechanism | 固定为 SCRAM-SHA-512 |
bootstrap.servers=<接入点地址>
security.protocol=SASL_SSL
ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password=bms@kafka
ssl.endpoint.identification.algorithm=
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
ssl.protocol = TLSv1.2
sasl.mechanism=SCRAM-SHA-512
生产者
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'
kafka-console-producer.sh --bootstrap-server <接入点地址> --topic <topic> --producer.config kafka.properties
消费者
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf' kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic <topic> --consumer.config
kafka.properties --from-beginning