上云无忧 > 文档中心 > 天翼云分布式消息服务Kafka Java消费者SDK接口
分布式消息服务Kafka
天翼云分布式消息服务Kafka Java消费者SDK接口

文档简介:
接口方法概述 (1)消费者接口 org.apache.kafka.clients.producer. KafkaConsumer 1) 拉取消息,返回值为ConsumerRecords ConsumerRecords poll(long timeout) 创建消费者 (1)消费者实例化
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

接口方法概述

(1)消费者接口

org.apache.kafka.clients.producer. KafkaConsumer
1) 拉取消息,返回值为ConsumerRecords<K, V>
ConsumerRecords<K, V> poll(long timeout)


创建消费者

(1)消费者实例化

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

(2)加密消费者实例化

Properties properties = new Properties();
properties.put("bootstrap.servers", "sasl_address, ex: localhost:8092"); // 安全接入地址
properties.put("group.id", "ppxgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");//earliest
properties.put("max.poll.records", 1);//earliest
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "ctg_kafka");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);

(3)消费者Property说明

常量字段

说明

bootstrap.servers

用于建立初始连接到kafka集群的"主机/端口对"配置列表

acks

•  acks=0   如果设置为0,则 producer   不会等待服务器的反馈。

•  acks=1   如果设置为1,leader节点会将记录写入本地日志,并且在所有   follower 节点反馈之前就先确认成功。

•  acks=all   如果设置为all,这就意味着   leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。

retries

若设置大于0的值,则客户端会将发送失败的记录重新发送

batch.size

当将多个记录被发送到同一个分区时,   Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。

linger.ms

producer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。

buffer.memory

Producer   用来缓冲等待被发送到服务器的记录的总字节数。

key.serializer

关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer   接口。

value.serializer

值的序列化类,实现以下接口:   org.apache.kafka.common.serialization.Serializer 接口。

方法调用说明

(1)创建&连接

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

(2)拉取消息

函数名:poll
功能描述:接受消息
入参说明:

参数

类型

是否可为空

说明

timeout

long

N

超时时间

输出说明:

参数

类型

说明

result

ConsumerRecords<K, V>

拉取消息

使用例子:

ConsumerRecords<String, String> records = consumer.poll(100);

代码示例

非加密方式:
Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("group.id", "test");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "1000");
 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
 consumer.subscribe(Arrays.asList("foo", "bar"));
 while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records)
         System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
 }

加密方式:

System.setProperty("java.security.krb5.conf", "path/to/krb5.conf");
System.setProperty("java.security.auth.login.config", "path/to/kafka_client.conf");
System.setProperty("sun.security.krb5.debug", "true");

Properties properties = new Properties();
properties.put("bootstrap.servers", "sasl_address, ex: localhost:8092"); // 安全接入地址
properties.put("group.id", "ppxgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");//earliest
properties.put("max.poll.records", 1);//earliest
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "ctg_kafka");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("ppx"));
        /* 读取数据,读取超时时间为100ms */

ConsumerRecords<Object, Object> records = consumer.poll(100);
records.forEach(record->{
     String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     System.out.println("================================="+format);
});

相似文档
  • 本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。
  • Kafka 的消费者示例代码片段如下: Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.90.139:8090,192.168.90.41:8090,192.168.90.42:8090"); properties.put("group.id", "ppsgroup"); properties.put("enable.auto.commit", "true");
  • 分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。 创建用户并配置用户权限 进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
  • Q:接入地址出现不足三个ip+端口: A:问题:集群三台机器正常运作的情况下,接入点会出现三个ip:port连起来,当出现不足三个时候,证明其中一台机器不正常工作(没出现在接入点的机器)。 解决:尽快联系管理人员查看不正常工作的节点,尽快恢复。
  • 天翼云分布式消息服务Kafka产品用户使用指南.pdf
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部