上云无忧 > 文档中心 > 腾讯云消息队列 Pulsar 版 - 订阅模式
消息队列 Pulsar 版
腾讯云消息队列 Pulsar 版 - 订阅模式

文档简介:
为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。
*此产品及展示信息均由腾讯云官方提供。免费试用 咨询热线:400-826-7010,为您提供专业的售前咨询,让您快速了解云产品,助您轻松上云! 微信咨询
  免费试用、价格特惠
为了适用不同场景的需求,Pulsar 支持四种订阅模式:Exclusive、Shared、Failover、Key_Shared。

独占模式(Exclusive)

Exclusive 独占模式(默认模式):一个 Subscription 只能与一个 Consumer 关联,只有这个 Consumer 可以接收到 Topic 的全部消息,如果该 Consumer 出现故障了就会停止消费。
Exclusive 订阅模式下,同一个 Subscription 里只有一个 Consumer 能消费 Topic,如果多个 Consumer 订阅则会报错,适用于全局有序消费的场景。

		
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为exclusive(独占)模式
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
启动多个消费者将收到错误信息如下图所示:

共享模式(Shared)

消息通过 round robin 轮询机制(也可以自定义)分发给不同的消费者,并且每个消息仅会被分发给一个消费者。当消费者断开连接,所有被发送给他,但没有被确认的消息将被重新安排,分发给其它存活的消费者。

		
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为 Shared(共享)模式
.subscriptionType(SubscriptionType.Shared)
.subscribe();
多个 Shared 模式消费者如下图所示:

灾备模式(Failover)

当存在多个 consumer 时,将会按字典顺序排序,第一个 consumer 被初始化为唯一接受消息的消费者。当第一个 consumer 断开时,所有的消息(未被确认和后续进入的)将会被分发给队列中的下一个 consumer。

		
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为灾备模式
.subscriptionType(SubscriptionType.Failover)
.subscribe();
多个 Failover 模式消费者如下图所示:

KEY 共享模式(Key_Shared)

当存在多个 Consumer 时,将根据消息的 Key 进行分发,Key 相同的消息只会被分发到同一个消费者。

注意:
Key_Shared 本身在使用上存在一定的限制条件,由于其工程实现复杂度较高,在社区版本迭代中,不断有对 Key_Shared 的功能进行改进以及优化,整体稳定性相较 Exclusive,Failover 和 Shared 这三种订阅类型偏弱。如果上述三种订阅类型能满足业务需要,可以优先选用上述三种订阅类型。
专业集群可以保证相同 KEY 的消息按顺序投递;虚拟集群无法保障消息投递顺序。

Key_Shared 使用建议

什么时候才考虑用 Key_Shared 订阅模式

如是普通的生产消费场景,建议直接选用 Shared 模式即可。
若需要让相同 Key 的消息分给同一个消费者,这个时候 Shared 订阅模式无法满足用户需求。有两种方式可以选择:
选择 Key_Shared 订阅模式。
通过多分区主题 + Failover 订阅模式实现。

什么场景下适合用 Key_Shared 订阅

Key 数量多且每个 Key 的消息分布相对均匀
消费处理速度快,无消息堆积的情况
如果在生产过程中不能保证上面的两个条件同时满足,建议用 【多分区主题 + Failover 订阅】

代码示例

Key_Shared 订阅示例

默认情况下,Pulsar 在生产消息时是开启 Batch 功能的,Pulsar 的 Batch 消息解析是在 Consumer 侧处理的。所以在 Broker 侧一个 Batch 消息是被当作一条 Entry 处理的,所以对于 Key_shared 的基于消息 Key 有序订阅类型来说,是没办法处理这种 Case 的,因为不同 Key 的消息有可能被打包到同一个 Batch 中。针对这种情况在创建 Producer 时有如下两种规避方式:
1. 禁用 Batch。
		
// 构建生产者
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息只会被分发到同一个消费者。
.key("youKey1")
.send();
2. 使用 key_based batch 类型。
		
// 构建生产者
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息只会被分发到同一个消费者。
.key("youKey1")
.send();
消费者代码示例:

		

// 构建消费者 Consumer<byte[]> consumer = pulsarClient.newConsumer() // topic完整路径,

格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制 .

topic("persistent://pulsar-xxx/sdk_java/topic1") // 需要在控制台Topic详情页创建好一个订阅,

此处填写订阅名 .subscriptionName("sub_topic1") // 声明消费模式为 Key_Shared(Key 共享)

模式 .subscriptionType(SubscriptionType.Key_Shared) .subscribe();

多个 Key_Shared 模式消费者。

多分区主题 + Failover 订阅示例

注意事项:
在该模式下,每个分区同时只会分配给一个消费者实例。若消费者数量多于分区数量,超出数量的消费者无法参与消息,可以通过扩容分区数量不小于消费者数量解决。
在设计 Key 的时候尽量保证 Key 分布均匀。
Failover 模式下不支持延时消息。
1. 生产者代码示例
		
// 构建生产者
Producer<byte[]> producer pulsarClient.newProducer()
.topic(topic)
.enableBatching(false) // 禁用batch
.create();
// 发送消息时设置key
MessageId msgId = producer.newMessage()
// 消息内容
.value(value.getBytes(StandardCharsets.UTF_8))
// 在此处设置key,key相同的消息会发送到同一个分区中
.key("youKey1")
.send();
2. 消费者代码示例
		
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为Failover模式
.subscriptionType(SubscriptionType.Failover)
.subscribe();

开启保序

TDMQ Pulsar 2.9.2 版本集群可以支持 KEY 的顺序投递。如需开启,需要在创建消费者实例时指定 keySharedPolicy。
		
// 构建消费者
Consumer<byte[]> consumer = pulsarClient.newConsumer()
// topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称,从【Topic管理】处复制
.topic("persistent://pulsar-xxx/sdk_java/topic1")
// 需要在控制台Topic详情页创建好一个订阅,此处填写订阅名
.subscriptionName("sub_topic1")
// 声明消费模式为 Key_Shared(Key 共享)模式
.subscriptionType(SubscriptionType.Key_Shared)
// 设置为不允许乱序
.keySharedPolicy(KeySharedPolicy.autoSplitHashRange().setAllowOutOfOrderDelivery(false))
.subscribe();
注意:
集群 2.7.2 版本不支持开启保序,可能造成消息推送堵塞,无法消费;
开启了保序,消费者重启后可能会出现消费速率下降、消息堆积的情况,这是因为在保序模式下新消费者上线后,需要等待消费者上线之前的全部消息都被消费完成后(全部确认后),才能继续消费后面的消息。
相似文档
  • 相关概念: 定时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟到某个时间点被消费,这类消息统称为定时消息。 延时消息:消息在发送至服务端后,实际业务并不希望消费端马上收到这条消息,而是推迟一段时间后再被消费,这类消息统称为延时消息。
  • 本文主要介绍 TDMQ Pulsar 版中消息标签过滤的功能、应用场景和使用方式。 功能介绍: Tag,即消息标签,用于对某个Topic下的消息进行分类。TDMQ Pulsar 版的生产者在发送消息时,指定消息的 Tag,消费者需根据已经指定的 Tag 来进行订阅。
  • 重试 Topic 是一种为了确保消息被正常消费而设计的 Topic 。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试 Topic 中,当重试达到一定次数后,停止重试,投递到死信 Topic 中。
  • 本文主要介绍 TDMQ Pulsar 客户端与连接、客户端与生产/消费者之间的关系,并向开发者介绍客户端合理的使用方式,以便更高效、稳定地使用 TDMQ Pulsar 版的服务。
  • 操作场景: 集群是 TDMQ Pulsar 版中的一个资源维度,不同集群的命名空间、Topic、角色权限等完全隔离。每个集群会有集群的资源限制例如 Topic 总数、消息保留时长等。常见的使用方式如:开发测试环境使用一个专门集群,生产环境使用一个专门的集群。
官方微信
联系客服
400-826-7010
7x24小时客服热线
分享
  • QQ好友
  • QQ空间
  • 微信
  • 微博
返回顶部