上云无忧 > 文档中心 > 百度智能云专享版消息服务 for Kafka 入门教程四:连接Kafka集群实例
消息服务 for Kafka
百度智能云专享版消息服务 for Kafka 入门教程四:连接Kafka集群实例

文档简介:
概述: 本文介绍在完成消息服务 for Kafka专享版集群创建后,如何通过Kafka命令行访问Kafka集群进行消息的发布及订阅。 其它SDK访问方式请参考SDK文档。 前提条件: 已完成百度智能云账号注册。 已创建Kafka集群。 已完成相应用户及权限的配置。
*此产品及展示信息均由百度智能云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

概述

本文介绍在完成消息服务 for Kafka专享版集群创建后,如何通过Kafka命令行访问Kafka集群进行消息的发布及订阅。

其它SDK访问方式请参考SDK文档。

前提条件

  • 已完成百度智能云账号注册
  • 已创建Kafka集群
  • 已完成相应用户及权限的配置

依赖

  • 下载Kafka包 kafka_2.13-2.7.2.tgz (解压后 bin 目录下是可执行脚本)
  • 具体接入点信息可以在“集群详情-网络安全-接入点查看”页面查看。

具体场景

场景一:无需认证直接通过接入点访问

认证方式选择 None(无需身份认证),且在同 VPC 网络下访问,可以使用 PLAINTEXT 协议接入。

创建 topic

kafka-topics.sh --create --bootstrap-server <接入点地址> --replication-factor 2 --partitions 3 --topic

删除 topic

kafka-topics.sh --delete --bootstrap-server <接入点地址> --topic

查看 topic 列表

kafka-topics.sh --list --bootstrap-server <接入点地址>

查看 topic 详情

kafka-topics.sh --describe --bootstrap-server <接入点地址> --topic

生产者

kafka-console-producer.sh --bootstrap-server <接入点地址> --topic

消费者

kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic  --from-beginning

场景二:SASL_PLAINTEXT 接入点访问

在同 VPC 网络下访问且已选择SASL/SCRAM身份认证方式,可使用 SASL_PLAINTEXT 协议接入。

JAAS 配置文件

创建 jaas 配置文件 kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule sufficient
    username="alice"
    password="alice1234!";
};

注:username,password 填入创建用户时设置的用户名及密码。

配置文件:kafka.properties

提供接入 Kafka 服务需要的配置信息,配置项如下:

配置项 配置内容
bootstrap.servers 接入点地址
security.protocol SASL_PLAINTEXT
sasl.mechanism SCRAM-SHA-512
bootstrap.servers=<接入点地址>

security.protocol=SASL_PLAINTEXT

sasl.mechanism=SCRAM-SHA-512

生产者

# 指定 kafka_client_jaas.conf 文件位置
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'

kafka-console-producer.sh --bootstrap-server <接入点地址> --topic <topic>  --producer.config kafka.properties

消费者

# 指定 kafka_client_jaas.conf 文件位置
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'

kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic <topic>  --consumer.config kafka.properties --from-beginning

场景三:SASL_SSL 接入点访问

在VPC网络外访问且已选择SASL/SCRAM身份认证方式,可使用SASL_SSL协议接入。

JAAS 配置文件

创建 jaas 配置文件 kafka_client_jaas.conf

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule sufficient
    username="alice"
    password="alice1234!";
};

注:username,password 填入创建用户时设置的用户名及密码。

证书文件:client.truststore.jks

下载证书文件: client.truststore.jks

配置文件:kafka.properties

提供接入 Kafka 服务需要的配置信息,配置项如下:

配置项 配置内容
bootstrap.servers 接入点地址
security.protocol SASL_SSL
ssl.truststore.location 配置为 client.truststore.jks 文件路径
例如:/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password bms@kafka
ssl.endpoint.identification.algorithm 固定为空
sasl.mechanism 固定为 SCRAM-SHA-512
bootstrap.servers=<接入点地址>

security.protocol=SASL_SSL

ssl.truststore.location=/etc/kafka/ssl/client.truststore.jks
ssl.truststore.password=bms@kafka
ssl.endpoint.identification.algorithm=
ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
ssl.protocol = TLSv1.2

sasl.mechanism=SCRAM-SHA-512

生产者

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'

kafka-console-producer.sh --bootstrap-server <接入点地址> --topic <topic>  --producer.config kafka.properties

消费者

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf'

kafka-console-consumer.sh --bootstrap-server <接入点地址> --topic <topic>  --consumer.config
 kafka.properties --from-beginning

相似文档
  • 计费方式说明: 具体的计费价格请参见 价格详情页。 计费项: 百度智能云消息服务 for Kafka专享版服务计费项如下: Kafka实例下各节点的配置规格。 各规格均为1:4的vCPU内存比(较高的内存比可供PageCache提升读写效率)。
  • 续费定义: 预付费实例:增加使用时间。 后付费实例:无到期时间,无需续费。 续费前:请保证账户余额充足,余额>100元。 续费说明: 消息服务 for Kafka专享版支持手动续费和自动续费两种续费方式,用户可以根据不同的应用场景选择合适的续费方式,推荐使用自动续费。
  • 用户可以根据需要变更实例配置,具体的变配及计费说明如下: 支持升级配置,不支持降级配置。 以天为最小计费单位,不足一天按一天计。 变更配置实例是否享受预付费折扣,以剩余时间长度是否满1年、2年、3年为准。
  • 预付费处理方法: 余额不足提醒: 在实例到期前7天,系统将给您发送“即将到期”通知。 欠费处理方法: 实例到期后停服,数据为您保留7天。停服期间不收取费用,系统将给您发送续费通知。 若实例到期时间超过7天,实例将被释放,数据无法恢复。 在释放实例前1天和释放实例时,系统将给您发送释放实例通知。
  • 方案一: 先将生产消息的业务迁移到新的Kafka,原Kafka不会有新的消息生产。待原有Kafka实例的消息全部消费完成后,再将消费消息业务迁移到新的Kafka,开始消费新Kafka实例的消息。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部