文档简介:
生产者接口
org.apache.kafka.clients.producer.KafkaProducer
1)发送消息,返回值为Future<RecordMetadata>
public Future<RecordMetadata> send(ProducerRecord<K, V> record);
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
生产者实例化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
加密生产者实例化
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ctg_kafka");
props.put("bootstrap.servers", "sasl_address,ex: localhost:8092"); // 安全接入点地址
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
生产者Property说明
常量字段 |
说明 |
bootstrap.servers |
用于建立初始连接到kafka集群的"主机/端口对"配置列表 |
acks |
• acks=0 如果设置为0,则 producer 不会等待服务器的反馈。 • acks=1 如果设置为1,leader节点会将记录写入本地日志,并且在所有 follower 节点反馈之前就先确认成功。 • acks=all 如果设置为all,这就意味着 leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。 |
retries |
若设置大于0的值,则客户端会将发送失败的记录重新发送 |
batch.size |
当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。 |
linger.ms |
producer 会将两个请求发送时间间隔内到达记录合并到一个单独批处理请求中 |
buffer.memory |
Producer 用来缓冲等待被发送到服务器的记录的总字节数。 |
key.serializer |
关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 |
value.serializer |
值的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer 接口。 |
方法调用说明
(1)创建&连接
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);;
(2)关闭连接
producer.close();
(3)发送消息
函数名:send
功能描述:发送消息,设置message相关属性,将消息发送至broker
入参说明:
参数 |
类型 |
是否可为空 |
说明 |
record |
ProducerRecord |
N |
消息记录 |
输出说明:
参数 |
类型 |
说明 |
result |
Future<RecordMetadata> |
包含消息时间戳,序号,和将要发送到的分区, |
使用例子:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record);
(4)发送消息[包含回调]
函数名:send
功能描述:发送消息,设置message相关属性,将消息发送至broker
入参说明:
参数 |
类型 |
是否可为空 |
说明 |
record |
ProducerRecord |
N |
消息 |
callback |
Callback |
N |
回调实例 |
输出说明:
参数 |
类型 |
说明 |
result |
Future<RecordMetadata> |
包含消息时间戳,序号,和将要发送到的分区, |
使用例子:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});
代码示例
非加密方式:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
加密方式:
// 加入加密密钥路径配置
System.setProperty("java.security.krb5.conf", "path/to/krb5.conf");
System.setProperty("java.security.auth.login.config", "path/to/kafka_client.conf");
System.setProperty("sun.security.krb5.debug", "true");
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ctg_kafka");
props.put("bootstrap.servers", "sasl_address, ex: localhost:8092"); // 安全接入点地址
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord record = new ProducerRecord<>("ppx", "pps200"+index++, dvalue);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
if (paramRecordMetadata == null) {
System.out.println("paramRecordMetadata is null ");
paramException.printStackTrace();
return;
}
System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
}
});