上云无忧 > 文档中心 > 天翼云分布式消息服务Kafka发布者实践
分布式消息服务Kafka
天翼云分布式消息服务Kafka发布者实践

文档简介:
本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。 文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。
*产品来源:中国电信天翼云。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠

本文主要介绍消息队列 Kafka 发布者的最佳实践,从而帮助您更好的使用该产品。

文中的最佳实践基于消息队列 Kafka 的 Java 客户端;对于其它语言的客户端,其基本概念与思想是通用的,但实现细节可能有差异,仅供参考。

Kafka 的发送非常简单,示例代码片段如下:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.90.139:8090");  //kafka server  ,192.168.1.159:8091
props.put("acks", "all");
props.put("retries", 1);
props.put("batch.size", 1684);
props.put("linger.ms", 0);
props.put("buffer.memory", 33554432); // buffer空间32M
props.put("request.timeout.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
int index = 0;
while(true) {
    String dvalue = "hello " ;
    ProducerRecord record = new ProducerRecord<>("pps", "pps"+index++, dvalue);
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata paramRecordMetadata, Exception paramException) {
            if (paramRecordMetadata == null) {
                System.out.println("paramRecordMetadata is null ");
                paramException.printStackTrace();
                return;
            }
            System.out.println("发送的消息信息 " + paramRecordMetadata.topic() + ", partition:" + paramRecordMetadata.partition());
        }
    });
    TimeUnit.SECONDS.sleep(1);
}

Key 和 Value

Kafka 0.10.0.0 的消息字段只有两个:Key 和 Value。Key 是消息的标识,Value 即消息内容。为了便于追踪,重要消息最好都设置一个唯一的 Key。通过 Key 追踪某消息,打印发送日志和消费日志,了解该消息的发送和消费情况。

失败重试
在分布式环境下,由于网络等原因,偶尔的发送失败是常见的。导致这种失败的原因有可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。

消息队列 Kafka 是 VIP 网络架构,会主动掐掉空闲连接(30 秒没活动),也就是说,不是一直活跃的客户端会经常收到 “connection rest by peer” 这样的错误,因此建议都考虑重试消息发送。

异步发送
发送接口是异步的;如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。

线程安全
Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。

Acks
Acks的说明如下:
acks=0,表示无需服务端的 Response,性能较高,丢数据风险较大;

acks=1,服务端主节点写成功即返回 Response,性能中等,丢数据风险中等,主节点宕机可能导致数据丢失;

acks=all,服务端主节点写成功,且备节点同步成功,才返回 Response,性能较差,数据较为安全,主节点和备节点都宕机才会导致数据丢失。

一般建议选择 acks=1,重要的服务可以设置 acks=all。

Batch
Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。
在构建 Producer 时,需要考虑以下两个参数:

batch.size : 发往每个分区(Partition)的消息缓存量(消息内容的字节数之和,不是条数)达到这个数值时,就会触发一次网络请求,然后客户端把消息真正发往服务器;
linger.ms : 每条消息待在缓存中的最长时间。若超过这个时间,就会忽略 batch.size 的限制,然后客户端立即把消息发往服务器。
由此可见,Kafka 客户端什么时候把消息真正发往服务器,是通过上面两个参数共同决定的:
batch.size 有助于提高吞吐,linger.ms有助于控制延迟。您可以根据具体业务需求进行调整。

OOM
结合 Kafka 的 Batch 设计思路,Kafka 会缓存消息并打包发送,如果缓存太多,则有可能造成 OOM(Out of Memory)。

buffer.memory : 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略 batch.size 和 linger.ms 的限制。
buffer.memory 的默认数值是 32 MB,对于单个 Producer 来说,可以保证足够的性能。需要注意的是,如果你在同一个 JVM 中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB 缓存空间,此时便有可能触发 OOM。
 在生产时,一般没有必要启动多个 Producer;如果特殊情况需要,则需要考虑buffer.memory的大小,避免触发 OOM。


相似文档
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部