上云无忧 > 文档中心 > 腾讯云消息队列 Pulsar 版 - 客户端连接与生产消费者
消息队列 Pulsar 版
腾讯云消息队列 Pulsar 版 - 客户端连接与生产消费者

文档简介:
本文主要介绍 TDMQ Pulsar 客户端与连接、客户端与生产/消费者之间的关系,并向开发者介绍客户端合理的使用方式,以便更高效、稳定地使用 TDMQ Pulsar 版的服务。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠
核心原则:
一个进程一个 PulsarClient 即可。
Producer、Consumer 是线程安全的,对于同一个 Topic,可以复用且最好复用。

客户端与连接

TDMQ Pulsar 客户端(以下简称 PulsarClient )是应用程序连接到 TDMQ Pulsar 版的一个基本单位,一个 PulsarClient 对应一个 TCP 连接。一般来说,用户侧的一个应用程序或者进程对应使用一个 PulsarClient,有多少个应用节点,对应就有多少个 Client 数量。若长时间不使用 TDMQ Pulsar 版服务的应用节点,应回收 Client 以节省资源消耗(当前 TDMQ Pulsar 版的连接上限是单个 Topic 200个 Client 连接)。
说明:
如果业务侧 Topic 数量较多,确实需要创建多个 Client 可以使用如下方式复用 Client 对象:
1. 同一个 Topic 的多个 Producer 或者 Consumer 分别复用同一个 Client 对象
2. 如果【1】仍不满足,可以尝试多个 Topic 复用同一个 Client 对象
3. 此限制只针对共享集群,在专业集群中,默认配置仍为 200,但可以根据用户的实际需求调整该限制。

客户端与生产/消费者

一个 Client 下可以创建多个生产和消费者,用于提升生产和消费的速度。比较常见的用法是,一个 Client 下,利用多线程创建多个 Producer 或 Consumer 对象,用于生产消费,不同 Producer 和 Consumer 之间数据相互隔离。
当前 TDMQ Pulsar 版对生产/消费者的限制为:
单个 Topic 生产者上限1000个。
单个 Topic 消费者上限2000个。

最佳实践

生产/消费者的数量不一定取决于业务对象,它们是一个可以复用的资源,通过名称作为唯一标识进行区分。

生产者

假设有1000个业务对象在同时生产消息,并不是要创建1000个 Producer,只要是向同一个 Topic 进行投递,每个应用节点可以先统一使用一个 Producer 来进行生产(单例模式),往往单个 Producer 就能吃满单个应用节点的硬件配置。
以下给出一段 Java 消息生产的代码示例。
				
//从配置文件中获取 serviceURL 接入地址、Token 密钥、Topic 全名和 Subscription 名称(均可从控制台复制)
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic}")
private String topic;
//声明1个 Client 对象、producer 对象
private PulsarClient pulsarClient;
private Producer<String> producer;
//在一段初始化程序中创建好客户端和生产者对象
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
}
在实际生产消息的业务逻辑中直接引用 producer 完成消息的发送。
				
//在实际生产消息的业务逻辑中直接引用,注意 Producer 通过范式声明的 Schema 类型要和传入对象匹配
public void onProduce(Producer<String> producer){
//添加业务逻辑
String msg = "my-message";//模拟从业务逻辑拿到消息
try {
//TDMQ Pulsar 版默认开启 Schema 校验, 消息对象一定需要和 producer 声明的 Schema 类型匹配
MessageId messageId = producer.newMessage()
.key("msgKey")
.value(msg)
.send();
System.out.println("delivered msg " + msgId + ", value:" + value);
} catch (PulsarClientException e) {
System.out.println("delivered msg failed, value:" + value);
e.printStackTrace();
}
}
public void onProduceAsync(Producer<String> producer){
//添加业务逻辑
String msg = "my-asnyc-message";//模拟从业务逻辑拿到消息
//异步发送消息,无线程阻塞,提升发送速率
CompletableFuture<MessageId> messageIdFuture = producer.newMessage()
.key("msgKey")
.value(msg)
.sendAsync();
//通过异步回调得知消息发送成功与否
messageIdFuture.whenComplete(((messageId, throwable) -> {
if( null != throwable ) {
System.out.println("delivery failed, value: " + msg );
//此处可以添加延时重试的逻辑
} else {
System.out.println("delivered msg " + messageId + ", value:" + msg);
}
}));
}
当一个生产者长时间不使用时需要调用 close 方法关闭,以避免占用资源;当一个客户端实例长时间不使用时,同样需要调用 close 方法关闭,以避免连接池被占满。
				
public void destroy(){
if (producer != null) {
producer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
}

消费者

如同生产者,消费者也最好按照单例模式进行使用,单个消费节点只需要一个客户端实例以及一个消费者实例。一般来说,一个消息队列的消费端的性能瓶颈都在于消费者按照自己业务逻辑处理消息的过程,而并非在接收消息的动作上。所以当出现了消费性能不足的时候,先看消费者的网络带宽消耗,如果趋势上看没有达到一个明显的上限,就应该先根据日志以及消息轨迹信息分析自身处理消息的业务逻辑耗时。
注意
当使用 Shared 或者 Key-Shared 模式时,消费者数量不一定小于等于分区数。服务端会有一个负责分发消息的模块按照一定的方式(Shared 模式默认是轮询,Key-Shared 则是在同一个 key 内轮询)将消息分发给所有的消费者。
当使用 Shared模式,如果生产侧暂停了生产,则到了末尾一部分消息时,可能会出现消费分布不均的情况。
使用多线程消费,即使复用一个 consumer 对象,消息的顺序也将无法得到保证。
以下给出一个 Java 基于 Spring boot 框架用线程池进行多线程消费的完整代码示例。
				
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Service
public class ConsumerService implements Runnable {
//从配置文件中获取 serviceURL 接入地址、Token 密钥、Topic 全名和 Subscription 名称(均可从控制台复制)
@Value("${tdmq.serviceUrl}")
private String serviceUrl;
@Value("${tdmq.token}")
private String token;
@Value("${tdmq.topic}")
private String topic;
@Value("${tdmq.subscription}")
private String subscription;
private volatile boolean start = false;
private PulsarClient pulsarClient;
private Consumer<String> consumer;
private static final int corePoolSize = 10;
private static final int maximumPoolSize = 10;
private ExecutorService executor;
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
@PostConstruct
public void init() throws Exception {
pulsarClient = PulsarClient.builder()
.serviceUrl(serviceUrl)
.authentication(AuthenticationFactory.token(token))
.build();
consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
//.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subscription)
.subscribe();
executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy());
start = true;
}
@PreDestroy
public void destroy() throws Exception {
start = false;
if (consumer != null) {
consumer.close();
}
if (pulsarClient != null) {
pulsarClient.close();
}
if (executor != null) {
executor.shutdownNow();
}
}
@Override
public void run() {
logger.info("tdmq consumer started...");
for (int i = 0; i < maximumPoolSize; i++) {
executor.submit(() -> {
while (start) {
try {
Message<String> message = consumer.receive();
if (message == null) {
continue;
}
onConsumer(message);
} catch (Exception e) {
logger.warn("tdmq consumer business error", e);
}
}
});
}
logger.info("tdmq consumer stopped...");
}
/**
* 这里写消费业务逻辑
*
* @param message
* @return return true: 消息ack return false: 消息nack
* @throws Exception 消息nack
*/
private void onConsumer(Message<String> message) {
//业务逻辑,延时类操作
try {
System.out.println(Thread.currentThread().getName() + " - message receive: " + message.getValue());
Thread.sleep(1000);//模拟业务逻辑处理
consumer.acknowledge(message);
logger.info(Thread.currentThread().getName() + " - message processing succeed:" + message.getValue());
} catch (Exception exception) {
consumer.negativeAcknowledge(message);
logger.error(Thread.currentThread().getName() + " - message processing failed:" + message.getValue());
}
}
}
相似文档
  • 操作场景: 集群是 TDMQ Pulsar 版中的一个资源维度,不同集群的命名空间、Topic、角色权限等完全隔离。每个集群会有集群的资源限制例如 Topic 总数、消息保留时长等。常见的使用方式如:开发测试环境使用一个专门集群,生产环境使用一个专门的集群。
  • 操作场景: 如当前的集群规格不满足您的业务需求,您可以在控制台上提升您的集群规格和存储规格。 说明: 当前仅专业集群支持升级集群规格,虚拟集群暂不支持升配。升级过程中理论上对业务无感,但随着服务端 Rebalance Topic 流量时,客户端可能会出现断开重连的现象。
  • 操作场景: 用户不再需要 TDMQ Pulsar 版集群时,可以删除该集群。 TDMQ Pulsar 版集群的生命周期是指集群从启动到释放所经历的状态。通过对集群从启动到删除期间的合理的管理,可确保运行于集群上的应用程序能高效经济地提供服务。
  • 操作场景: 本文档可以指导您使用 TDMQ Pulsar 版时,将自己当前私有网络 VPC 中的资源和 TDMQ Pulsar 版开通互访,以保证您部署的生产者/消费者客户端能正常和 TDMQ Pulsar 版通信。
  • 背景: TDMQ Pulsar 提供的 VPC 接入点,只支持同地域 VPC 网络的客户端接入。对于通过专线打通或者跨地域访问的用户,无法通过 VPC 接入点直接访问。 如果是上述接入场景的用户,需要用户自己配置接入点的域名解析,来进行接入。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部