Skip to content

消息队列

Pulsar、ActiveMQ、RabbitMQ、RocketMQ 和 Kafka 五款消息中间件的区别:

维度PulsarActiveMQRabbitMQRocketMQKafka
架构模型分布式架构,支持多租户中心化架构中心化架构分布式架构分布式架构
消息协议支持多种协议 (AMQP, MQTT)AMQP, STOMP, OpenWireAMQP自定义协议自定义协议 (Kafka协议)
持久化机制磁盘持久化,支持分区JDBC, KahaDB, LevelDBMnesia, RabbitDB磁盘持久化磁盘持久化
消息顺序支持单分区消息顺序支持支持支持支持分区内顺序
扩展性动态扩展水平扩展有一定限制水平扩展有限易于水平扩展高度可扩展
性能高吞吐量,低延迟吞吐量适中吞吐量适中高吞吐量,低延迟高吞吐量
管理与监控管理界面,支持多租户Web控制台Web管理控制台控制台和监控工具控制台 (Kafka Manager)
使用场景实时流处理、大数据企业级应用,消息队列轻量级消息传递分布式事务日志聚合、数据流处理

Apache Pulsar

bash
docker run -it \
-e PULSAR_PREFIX_xxx=yyy \
-p 6650:6650  \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar sh \
-c "bin/apply-config-from-env.py \
conf/standalone.conf && \
bin/pulsar standalone"
java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
        .topic(topic)
        .subscriptionName("sub-negative-ack")
        .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
        .negativeAckRedeliveryBackoff(MultiplierRedeliveryBackoff.builder()
            .minDelayMs(1000)
            .maxDelayMs(60 * 1000)
            .multiplier(2)
            .build())
        .subscribe();

Message<byte[]> message = consumer.receive();

// call the API to send negative acknowledgment
consumer.negativeAcknowledge(message);

message = consumer.receive();
consumer.acknowledge(message);

死信主题

java
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
              .topic(topic)
              .subscriptionName("my-subscription")
              .subscriptionType(SubscriptionType.Shared)
              .deadLetterPolicy(DeadLetterPolicy.builder()
                    .maxRedeliverCount(maxRedeliveryCount)
                    .build())
              .subscribe();

订阅类型

java
Producer<byte[]> producer = client.newProducer()
        .topic("my-topic")
        .batcherBuilder(BatcherBuilder.KEY_BASED)
        .create();

订阅模式指示游标属于持久类型或非持久类型

java
Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic("my-topic")
                .subscriptionName("my-sub")
                .subscriptionMode(SubscriptionMode.Durable)
                .subscribe();

Consumer<byte[]> consumer = pulsarClient.newConsumer()
                .topic("my-topic")
                .subscriptionName("my-sub")
                .subscriptionMode(SubscriptionMode.NonDurable)
                .subscribe();

多主题订阅示例

java
import java.util.regex.Pattern;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;

PulsarClient pulsarClient = // Instantiate Pulsar client object

// Subscribe to all topics in a namespace
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(allTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();

// Subscribe to a subsets of topics in a namespace, based on regex
Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
                .topicsPattern(someTopicsInNamespace)
                .subscriptionName("subscription-1")
                .subscribe();