ActiveMQ提供了虚拟通道的特性(Virtual Destination),它允许一个逻辑通道(logical destination)映射成一个或者多个物理通道(physical destination);它可以非常灵活的解决"消息整合"方面的问题,它可以实现:
1) 提供了VirtualTopic特性,可以让一个订阅者的消息列表,作为Queue来消费。
2) 提供了Composite特性,可以把一个逻辑通道中的消息,转发到任意多的物理通道中。
一. VirtualTopic
Topic最大的限制就是同一个ClientId的订阅者,任何时刻只能有一个活跃。所以我们在分布式部署时,就会很麻烦,比如一个应用部署成多个实例,且它们都有相同的Topic Consumer配置,那么意味着一个实例部署成功后,其它的实例都会因为无法订阅Topic而导致故障;同时也意味着,如果这个Topic Consumer失效后,我们不能自动让其他Consumer的接管它。但是Queue却没有这些限制,因为Queue可以同时有任意多个消费者,它们可以并发的消费消息,从而实现“负载均衡”。如果我们期望Topic也能如此,那么可以用VirtualTopic。
默认情况下,ActiveMQ开启了VirtualTopic特性,此时Topic的默认命名空间为“VirtualTopic.>”,即命名模式为“VirtualTopic.<topicName>”,比如“VirtualTopic.order”;那么对于consumer而言,对应的Queue命名模式为“Consumer.*.VirtualTopic.<topicName>”,其中“*”的语义类似于clientId。代码示例如下:
//物理通道名称为:order //producer端 Topic top = session.createTopic("VirtualTopic.order"); producer = session.createProducer(top); //consumer端 //其中dbcenter为clientId //此后我们可以创建多个consumer,它们使用同一个clientId Queue queue = session.createQueue("Consumer.dbcenter.VirtualTopic.order"); MessageConsumer consumer = session.createConsumer(queue);
当然,ActiveMQ的默认配置,还不能满足我们的日常需求,比如:
1)一个现有的topic,如果需要转换为VirtualTopic
2)希望物理topic与虚拟topic可以并存使用,比如一个Topic,已经有多个consumer,但是新加入了一个consumer,且此consumer需要使用VirtualTopic模式否则无法完成正常业务,如果将现有的producer和consumer的Topic地址都调整一遍是不太现实的,那么此时我们就需要通过调整配置来完成,即自定义VirtualTopic。(如下配置,将所有的topic映射为VirtualTopic)
<broker xmlns="http://activemq.apache.org/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <virtualTopic name=">" prefix="Consumer.*." selectorAware="false"/> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker>
<!-- ActiveMQ的默认方式,等效于如下配置 --> <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." selectorAware="false"/>
调整配置之后,我们的producer端代码不需要任何修改,即现有的物理Topic已经被映射为虚拟topic,且topic命名中无需添加“VirtualTopic.”前缀。代码示例如下:
//producer端 Topic top = session.createTopic("order"); producer = session.createProducer(top); //consumer端 //其中dbcenter为clientId //此后我们可以创建多个consumer,它们使用同一个clientId Queue queue = session.createQueue("Consumer.dbcenter.order"); MessageConsumer consumer = session.createConsumer(queue);
对于order而言,“dbcenter”相当于是一个订阅者;Broker将dbcenter订阅的消息转发到了“Consumer.dbcenter.order”队列中;最重要的一点,这个Queue完全具有队列的所有特性,它的Consumer可以并行消费。
其中还有一个重要的参数"selectorAware",它表示从Topic中将消息转发给Queue时,是否关注Consumer的selector情况。如果为false,那么Topic中的消息全部转发给Queue,否则只会转发匹配Queue Consumer的selector的消息。需要非常注意,当selectorAware为true时,如果消息不匹配任何selector或者Queue中没有任何Consumer活跃,那么消息将不会转发给Queue。
此外,VirtualTopic仍然可以被正常的订阅者消费(默认配置情况下),即:
Topic topic = session.createTopic("VirtualTopic.order"); TopicSubscriber subscriber = session.createDurableSubscriber(topic,"dbcenter");
同时需要注意,VirtualTopic只会转发“Client标识”注册之后的消息,且即使Queue消费了消息,VirtualTopic中的消息仍然不会被删除(看起来仍然是Dequeued=0),对于Broker而言,逻辑Queue不被认为是一个“Durable Subscriber”,只有真正的Subscriber消费消息后,Topic中的消息才会Dequeue。不过消息Dequeue后,不会影响Queue中的消息,因为这是基于Copy的。
到目前为止,我尚不清楚,如果VirtualTopic中没有真正的Subscriber,这些消息该如何Dequeue。
当真正的subscriber和Queue都同时存在VirtualTopic中的时候,而且你的broker架构采用了“forward-brige”结构,那么你需要增加如下配置来避免消息的重复转发问题。在forward-brige架构中,任何通道中的消息都会forward到其他network node中(其他broker上),当然这个虚拟的Queue的消息也不例外。
<networkConnectors> <networkConnector uri="static://(tcp://localhost:61617)"> <excludedDestinations> <!-- prefix和VirtualTopic保持一致 <queue physicalName="Consumer.*.VirtualTopic.>"/> </excludedDestinations> </networkConnector> </networkConnectors>
二. Composite Destinations
复合通道,它允许一条消息在多个物理通道间转发,就像一个通道映射成多个一样(one-many);比如复合通道A,映射成B和C,那么发往A的消息会同时转发给B和C,那么消费者可以直接通过B或者C获取消息;
这是一种实现消息复制转发、通道映射的便捷办法。复合通道包括CompositeQueue和CompositeTopic两种。
<broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core"> <destinationInterceptors> <virtualDestinationInterceptor> <virtualDestinations> <compositeQueue name="order"> <forwardTo> <queue physicalName="order.dbcenter" /> <topic physicalName="order.statistic" /> </forwardTo> </compositeQueue> <!-- <compositeTopic name="order" forwardOnly="false"> <forwardTo> <queue physicalName="order.dbcenter" /> <topic physicalName="order.statistic" /> </forwardTo> </compositeTopic> --> </virtualDestinations> </virtualDestinationInterceptor> </destinationInterceptors> </broker>
上述配置forwardOnly属性表示发往CompositeQueue中的消息是否“仅仅转发,而不本地保留”,如果forwardOnly为true,那么消息将不会在order队列中保留,即order队列中不会有任何消息。如果为false,那么消息将会转发完成后,添加到order中,消费者仍然可以消费order队列中的消息。无论是Compsite通道还是转发的通道,它们和普通的通道没有任何区别,开发者仍然可以像使用普通的通道一样使用它们(消费消息和发送消息)。
很多时候,我们希望在转发消息时,能够使用selector,此时我们可以使用filteredDestination,这样我们可以消息转发时控制消息。
<compositeQueue name="MY.QUEUE"> <forwardTo> <filteredDestination selector="orderType = 1" queue="food.order"/> <filteredDestination selector="status = 1" topic="order.statistic"/> </forwardTo> </compositeQueue>
相关推荐
activemq 虚拟topic配置,可以将一个 topic转发为多个队列和多个topic或者将一个队列转发为多个topic和多个队列
基于SpringBoot开发的ActiveMQ虚拟主题客户端,达到消费者多点消费、负载均衡和故障转移的目的。 详情可以参考这里: https://my.oschina.net/noryar/blog/1573047 https://my.oschina.net/noryar/blog/1575003
ActiveMQ的安装与使用ActiveMQ的安装与使用ActiveMQ的安装与使用
ActiveMQ5.13 安装与配置
ActiveMQ与Spring线程池整合的一个实例。 lib库没有上传。 对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar ...
ActiveMQ与Tomcat整合教程,好像是word2007的,嘿嘿
activemq 配置说明与activemq入门讲解
ActiveMQ与Spring整合示例Demo,ActiveMQ安装在linux系统中
ActiveMQ与Tomcat整合教程.docx
activemq教程,activemq与tomcat整合
ActiveMQ与spring整合进行封装,实现全注解开发,并且支持topic与queue两种模式无缝切换,对外提供统一的接口调用,简化操作。对连接池进行优化,容器实例动态生成。
activemq与spring的整合的简单案例,整体说明见博文:http://blog.csdn.net/u012897777/article/details/78748221
activemq性能与高可用性测试,activemq性能与高可用性测试。
ActiveMQ与Zookeeper集群测试代码,用于测试高可用效果 。。。。。。。。。。。。。。。
activemq activeMq笔记.docx
activeMQ与spring整合开发的例子程序,消息队列技术
cJMS之ActiveMQ与Spring整合源码
该资源是spring与activemq的基础整合,适合消息中间件的初学者
activemq与spring整合发送jms消息入门实例 jar 包和实例代码都在压缩包里了
支持持久化的采用Spring整合activeMQ与quartz的JMS数据同步实例,包含依赖的jar包