消息队列
Pulsar、ActiveMQ、RabbitMQ、RocketMQ 和 Kafka 五款消息中间件的区别:
维度 | Pulsar | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|---|
架构模型 | 分布式架构,支持多租户 | 中心化架构 | 中心化架构 | 分布式架构 | 分布式架构 |
消息协议 | 支持多种协议 (AMQP, MQTT) | AMQP, STOMP, OpenWire | AMQP | 自定义协议 | 自定义协议 (Kafka协议) |
持久化机制 | 磁盘持久化,支持分区 | JDBC, KahaDB, LevelDB | Mnesia, RabbitDB | 磁盘持久化 | 磁盘持久化 |
消息顺序 | 支持单分区消息顺序 | 支持 | 支持 | 支持 | 支持分区内顺序 |
扩展性 | 动态扩展 | 水平扩展有一定限制 | 水平扩展有限 | 易于水平扩展 | 高度可扩展 |
性能 | 高吞吐量,低延迟 | 吞吐量适中 | 吞吐量适中 | 高吞吐量,低延迟 | 高吞吐量 |
管理与监控 | 管理界面,支持多租户 | Web控制台 | Web管理控制台 | 控制台和监控工具 | 控制台 (Kafka Manager) |
使用场景 | 实时流处理、大数据 | 企业级应用,消息队列 | 轻量级消息传递 | 分布式事务 | 日志聚合、数据流处理 |
Apache Pulsar
bash
docker run -it \
-e PULSAR_PREFIX_xxx=yyy \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar sh \
-c "bin/apply-config-from-env.py \
conf/standalone.conf && \
bin/pulsar standalone"
java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-negative-ack")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
.minDelayMs(1000)
.maxDelayMs(60 * 1000)
.multiplier(2)
.build())
.subscribe();
Message<byte[]> message = consumer.receive();
// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);
message = consumer.receive();
consumer.acknowledge(message);
死信主题
java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.build())
.subscribe();
订阅类型
java
Producer<byte[]> producer = client.newProducer()
.topic("my-topic")
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
订阅模式指示游标属于持久类型或非持久类型
java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.subscriptionMode(SubscriptionMode.Durable)
.subscribe();
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.subscriptionMode(SubscriptionMode.NonDurable)
.subscribe();
多主题订阅示例
java
import java.util.regex.Pattern;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
PulsarClient pulsarClient = // Instantiate Pulsar client object
// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(someTopicsInNamespace)
.subscriptionName("subscription-1")
.subscribe();