文档简介:
接口方法概述
(1)消费者接口
org.apache.kafka.clients.producer. KafkaConsumer
1) 拉取消息,返回值为ConsumerRecords<K, V>
ConsumerRecords<K, V> poll(long timeout)
创建消费者
(1)消费者实例化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
(2)加密消费者实例化
Properties properties = new Properties();
properties.put("bootstrap.servers", "sasl_address, ex: localhost:8092"); // 安全接入地址
properties.put("group.id", "ppxgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");//earliest
properties.put("max.poll.records", 1);//earliest
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "ctg_kafka");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
(3)消费者Property说明
常量字段 |
说明 |
bootstrap.servers |
用于建立初始连接到kafka集群的"主机/端口对"配置列表 |
acks |
• acks=0 如果设置为0,则 producer 不会等待服务器的反馈。 • acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。 • acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。 |
retries |
若设置大于0的值,则客户端会将发送失败的记录重新发送 |
batch.size |
当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。 |
linger.ms |
producer 会将两个请求发送时间间隔内到达的记录合并到一个单独的批处理请求中。 |
buffer.memory |
Producer 用来缓冲等待被发送到服务器的记录的总字节数。 |
key.serializer |
关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 |
value.serializer |
值的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 |
方法调用说明
(1)创建&连接
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
(2)拉取消息
函数名:poll
功能描述:接受消息
入参说明:
参数 |
类型 |
是否可为空 |
说明 |
timeout |
long |
N |
超时时间 |
输出说明:
参数 |
类型 |
说明 |
result |
ConsumerRecords<K, V> |
拉取消息 |
使用例子:
ConsumerRecords<String, String> records = consumer.poll(100);
代码示例
非加密方式:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(),record.key(), record.value());
}
加密方式:
System.setProperty("java.security.krb5.conf", "path/to/krb5.conf");
System.setProperty("java.security.auth.login.config", "path/to/kafka_client.conf");
System.setProperty("sun.security.krb5.debug", "true");
Properties properties = new Properties();
properties.put("bootstrap.servers", "sasl_address, ex: localhost:8092"); // 安全接入地址
properties.put("group.id", "ppxgroup");
properties.put("enable.auto.commit", "true");
properties.put("auto.offset.reset", "earliest");//earliest
properties.put("max.poll.records", 1);//earliest
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "GSSAPI");
properties.put("sasl.kerberos.service.name", "ctg_kafka");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<Object, Object> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList("ppx"));
/* 读取数据,读取超时时间为100ms */
ConsumerRecords<Object, Object> records = consumer.poll(100);
records.forEach(record->{
String format = String.format("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
System.out.println("================================="+format);
});