上云无忧 > 文档中心 > 天翼云分布式消息服务Kafka Java生产者SDK接口
分布式消息服务Kafka
天翼云分布式消息服务Kafka Java生产者SDK接口

文档简介:
生产者接口 org.apache.kafka.clients.producer.KafkaProducer 1)发送消息,返回值为Future public Future send(ProducerRecord record); public Future send(ProducerRecord record, Callback callback);
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

生产者接口

org.apache.kafka.clients.producer.KafkaProducer
1)发送消息,返回值为Future<RecordMetadata>
public Future<RecordMetadata> send(ProducerRecord<K, V> record);

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);


生产者实例化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);


加密生产者实例化

Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ctg_kafka");
props.put("bootstrap.servers", "sasl_address,ex: localhost:8092");  // 安全接入点地址
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);


生产者Property说明

常量字段

说明

bootstrap.servers

用于建立初始连接到kafka集群的"主机/端口对"配置列表

acks

•  acks=0   如果设置为0,则 producer   不会等待服务器的反馈。

•  acks=1   如果设置为1,leader节点会将记录写入本地日志,并且在所有   follower 节点反馈之前就先确认成功。

•  acks=all   如果设置为all,这就意味着   leader 节点会等待所有同步中的副本确认之后再确认这条记录是否发送完成。

retries

若设置大于0的值,则客户端会将发送失败的记录重新发送

batch.size

当将多个记录被发送到同一个分区时,   Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。

linger.ms

producer 会将两个请求发送时间间隔内到达记录合并到一个单独批处理请求中

buffer.memory

Producer   用来缓冲等待被发送到服务器的记录的总字节数。

key.serializer

关键字的序列化类,实现以下接口: org.apache.kafka.common.serialization.Serializer   接口。

value.serializer

值的序列化类,实现以下接口:   org.apache.kafka.common.serialization.Serializer 接口。



方法调用说明

(1)创建&连接

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);;

(2)关闭连接

producer.close();

(3)发送消息

函数名:send
功能描述:发送消息,设置message相关属性,将消息发送至broker
入参说明:

参数

类型

是否可为空

说明

record

ProducerRecord

N

消息记录

输出说明:

参数

类型

说明

result

Future<RecordMetadata>

包含消息时间戳,序号,和将要发送到的分区,

使用例子:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record);


(4)发送消息[包含回调]

函数名:send
功能描述:发送消息,设置message相关属性,将消息发送至broker 
入参说明:

参数

类型

是否可为空

说明

record

ProducerRecord

N

消息

callback

Callback

N

回调实例

输出说明:

参数

类型

说明

result

Future<RecordMetadata>

包含消息时间戳,序号,和将要发送到的分区,

使用例子:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes(); ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
        if (paramRecordMetadata == null) {
            System.out.println("paramRecordMetadata is null ");
            paramException.printStackTrace();
            return;
       }
        System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
     }
});



代码示例

非加密方式:

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

加密方式:
// 加入加密密钥路径配置
System.setProperty("java.security.krb5.conf", "path/to/krb5.conf");
System.setProperty("java.security.auth.login.config", "path/to/kafka_client.conf");
System.setProperty("sun.security.krb5.debug", "true");
Properties props = new Properties();
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "GSSAPI");
props.put("sasl.kerberos.service.name", "ctg_kafka");
props.put("bootstrap.servers", "sasl_address, ex: localhost:8092");  // 安全接入点地址
props.put("acks", "0");
props.put("retries", 3);
props.put("batch.size", 1684);
props.put("linger.ms", 100);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord record = new ProducerRecord<>("ppx", "pps200"+index++, dvalue);
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
        if (paramRecordMetadata == null) {
            System.out.println("paramRecordMetadata is null ");
            paramException.printStackTrace();
            return;
        }
        System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
    }
});


相似文档
  • 接口方法概述 (1)消费者接口 org.apache.kafka.clients.producer. KafkaConsumer 1) 拉取消息,返回值为ConsumerRecords ConsumerRecords poll(long timeout) 创建消费者 (1)消费者实例化
  • 本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。
  • Kafka 的消费者示例代码片段如下: Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.90.139:8090,192.168.90.41:8090,192.168.90.42:8090"); properties.put("group.id", "ppsgroup"); properties.put("enable.auto.commit", "true");
  • 分布式消息Kafka使用SASL 认证协议来实现身份验证的能力,加密的主题需要经过身份校验才能正常地消费和生产消息。 创建用户并配置用户权限 进入应用用户管理界面,新建用户,并给用户添加主题和消费组的权限,并且下载密钥。详细操作可以查看用户指南。
  • Q:接入地址出现不足三个ip+端口: A:问题:集群三台机器正常运作的情况下,接入点会出现三个ip:port连起来,当出现不足三个时候,证明其中一台机器不正常工作(没出现在接入点的机器)。 解决:尽快联系管理人员查看不正常工作的节点,尽快恢复。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部