`
QING____
  • 浏览: 2228426 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ消息策略

 
阅读更多

    ActiveMQ中提供了众多的“策略”(policy),它们可以在broker端为每个通道“定制”消息的管理方式。本文将简单描述主要的几种Policy。

 

一. DispatchPolcicy: 转发策略(Topic)

    此策略表明broker端消息转发给多个Consumer时,消息被发送的顺序性,这个顺序通常指Consumer的顺序,只对Topic有效,它有3种常用的类型:

    1) RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。

    2) StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。

    3) PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。

    4) SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。

 

    "轮询"是比较常用的策略。

<policyEntry topic=">">               
  <dispatchPolicy>  
   <roundRobinDispatchPolicy/>
  </dispatchPolicy>
</policyEntry>

 

二.SubscriptionRecoveryPolicy: 恢复策略(Topic)

    在非耐久订阅者“失效”期间或一个新的Topic,broker可以保留的可追溯的消息量。前提是Topic必须是“retroactive”,我们可以在distination地址中指定此属性,例如:"order.topic?consumer.retroactive=true"。默认情况下,订阅者只能获取“订阅”开始之后的消息,如果Retroactive=true,那么订阅者就可以获取其创建之前的消息列表。此Policy就是用来控制“retroactive”的消息量。

 

    1) FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息。

<!-- 1K -->
<fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>

 

    2) FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。

<!-- 100条 -->
<fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>

 

    3) LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据

    4) QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。

    5) TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。

<!-- 可追溯最近1分钟的消息-->
<timedSubscriptionRecoveryPolicy recoverDuration="60000" />

 

    6) NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。

<policyEntry topic=">">               
  <subscriptionRecoveryPolicy>  
   <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
  </subscriptionRecoveryPolicy>
</policyEntry

 

三. DeadLetterStrategy: “死信”策略

    Broker将如何管理“死信”。当消息过期后,或者“重发”几次之后仍然不能被正常消息,那么此消息将会被移除到DeadLetter队列中。此后,我们可以通过侦听死信队列,来获取相关通知或者对消息做额外的操作。

    1)  IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”,Topic为“ActiveMQ.DLQ.Topic.”;比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。

    默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。

<policyEntry queue="order">
  <deadLetterStrategy>
	<individualDeadLetterStrategy
	  queuePrefix="DLQ." useQueueForQueueMessages="false" />
  </deadLetterStrategy>
</policyEntry>

 

    上述将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。individualDeadLetterStrategy还有一个属性为“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。 

 

    2) SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。

<deadLetterStrategy>
	<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
 </deadLetterStrategy>

 

    3) DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。

<broker>
    <plugins>
      <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" />
    </plugins>
</broker>

 

对于上述三种策略,还有2个很重要的可选参数,“processExpired”表示是否将过期消息放入死信队列,默认为true;“processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。

 

四. PendingMessageLimitStrategy: 消息限制策略(面向Slow Consumer)

    此策略只对Topic有效,只对nondurable订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。为了防止Topic中有慢速消费者,导致整个通道消息积压。(对于Topic而言,一条消息只有所有的订阅者都消费才会被删除)

    1) ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使用“MessageEvictionStrategy”移除消息(参见下文)。

<policyEntry topic="PRICES.>">
	<!-- lets force old messages to be discarded for slow consumers -->
	<pendingMessageLimitStrategy>
		<constantPendingMessageLimitStrategy limit="50"/>
	</pendingMessageLimitStrategy>
</policyEntry>

 

    2) PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。

<!-- 如果prefetchSize为100,则保留2.5 * 100条消息 -->
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

 

五. MessageEvictionStrategy: 消息剔除策略(面向Slow Consumer)

    配合PendingMessageLimitStrategy,只对Topic有效,只对nondurable订阅者有效。当PendingMessage的数量超过限制时,broker该如何剔除多余的消息。当Topic接收到信息消息后,会将消息“Copy”给每个订阅者,在保存这个消息时(保存策略参见【八】),将会检测pendingMessages的数量是否超过限制(由【四】来检测),如果超过限制,将会在pendingMessages中使用MessageEvicationStrategy移除多余的消息,此后将新消息保存在PendingMessages中。

    1) OldestMessageEvictionStrategy: 移除旧消息,默认策略。

    2) OldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息,将会被移除。(message.getPriority())

    3) UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。开发者可以指定property的名称,从此属性值相同的消息列表中移除较旧的(根据消息的创建时间)。

<policyEntry topic="SKU.PRICE.>">
	<!-- lets force old messages to be discarded for slow consumers -->
	<pendingMessageLimitStrategy>
		<constantPendingMessageLimitStrategy limit="10000"/>
	</pendingMessageLimitStrategy>
	<messageEvictionStrategy>
		<uniquePropertyMessageEvictionStrategy propertyName="SKU" />
	</messageEvictionStrategy>
</policyEntry>

 

    上述配置,针对SKU.PRICE通道中,只保留10000个最新的消息,当容量达到阀值时,将SKU值相同的消息列表中较旧的消息移除(只保留最新的一条消息)。比如在每条消息中,都封装一个SKU(商品ID)以及其最新价格,那么通过这种策略,值保留相同SKU的最新的一个消息。 

 

六. SlowConsumerStrategy: 慢速消费者策略

    Broker将如何处理慢消费者。Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭关闭它们。

    1) AbortSlowConsumerStrategy: 中断慢速消费者,慢速消费将会被关闭。

<slowConsumerStrategy>  
    <abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 -->  
</slowConsumerStrategy>

 

    2) AbortSlowConsumerStrategy: 如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。

<slowConsumerStrategy>  
    <abortSlowConsumerStrategy  maxTimeSinceLastAck="30000"/><!-- 30秒滞后 -->  
</slowConsumerStrategy>

 

七. PendingQueueMessageStoragePolicy: 待消费消息策略

    当通道中有大量Slow Consumer时,Broker该如何优化消息的转发,以及在此情况下,“非持久化”消息达到内存限制时该如何处理。

 

    当Broker接受到消息后,通常将最新的消息写入内存以提高消息转发的效率,提高消息ACK的效率,减少对对底层Store的操作;如果Consumer非常快速,那么消息将会立即转发给Consumer,不需要额外的操作;但当遇到Slow Consumer时,情况似乎并没有那么美好。

 

    持久化消息,通常为:写入Store-->线程轮询,从Store中pageIn数据到PendingStorage-->转发给Consumer-->从PendingStorage中移除-->消息ACK后从Store中移除。

 

    对于非持久化数据,通常为:写入内存-->如果内存足够,则PendingStorage直接以内存中的消息转发-->如果内存不足,则将内存中的消息swap到临时文件中-->从临时文件中pageIn到内存,转发给Consumer。

 

    AcitveMQ提供了几个的Cursor机制,它就是用来保存Pending Messages。

 

    1) vmQueueCursor: 将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。

    2) fileQueueCursor: 将消息保存到临时文件中。文件存储方式有broker的tempDataStore属性决定。是“持久化消息”的默认设置。

    3) storeCursor: “综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。

<!-- persistent为true时,才能将“溢出”的非持久化消息保存为临时文件 -->
<broker persistent="true">
	<persistenceAdapter>
		<!--
		<kahaDB directory="${activemq.data}/kahadb"/>
		-->
		<levelDB directory="${activemq.data}/leveldb"/>
	</persistenceAdapter>
	<!-- 临时文件存储,默认不存储任何临时文件 -->
	<tempDataStore>
		<!--
		<pListStoreImpl directory="${activemq.data}/tmp"/>
		-->
		<levelDB directory="${activemq.data}/leveldb/tmp"/>
	</tempDataStore>
	<!-- 内存限制为512M,如果超过阀值,则转存 -->
	<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">               
		<pendingQueuePolicy>  
			<storeCursor>
				<nonPersistent>
					<fileQueueCursor/>
				</nonPersistent>
			</storeCursor>
		</pendingQueuePolicy>  
	</policyEntry>
	<systemUsage>    
		<systemUsage sendFailIfNoSpace="true">    
		   <memoryUsage>  
			<!-- 所有的通道缓存消息的总内存大小,memoryLimit作为其子模块 -->   	   
			 <memoryUsage limit="6gb"/> 
		   </memoryUsage>    
		</systemUsage>    
	</systemUsage> 
</broker>

  

八. PendingSubscriberMessageStoragePolicy:(Topic)

    针对“非耐久”订阅者。概念和(七)一样,支持三种策略:storeCursor, vmCursor和fileCursor。

 

九. PendingDurableSubscriberMessageStoragePolicy: (Topic)

    针对“耐久”订阅者,支持三种策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。

<policyEntry topic=">" producerFlowControl="false" memoryLimit="32mb">  
    <pendingSubscriberPolicy>  
        <!-- 对于非耐久的订阅者,非持久化消息: vmCursor,fileCursor -->  
         <fileCursor/>  
    </pendingSubscriberPolicy>  
    <pendingDurableSubscriberPolicy>  
        <!-- 对于耐久的订阅者,非持久化消息 -->  
        <!-- storeDurableSubscriberCursor -->  
        <!-- vmDurableCursor -->  
        <!-- fileDurableSubscriberCursor -->  
        <storeDurableSubscriberCursor/>  
    </pendingDurableSubscriberPolicy>  
</policyEntry>

 

十. ForcePersistencyModeBrokerPlugin: 消息传输模式强制转换插件

    将broker接收的消息强制转换成"PERSISTENT"或者"NOT_PERSISTENT"。

<broker>
	<plugins>
		<!-- 将所有消息的传输模式,修改为"PERSISTENT" -->
	  <forcePersistencyModeBrokerPlugin persistenceFlag="true"/>
	</plugins>
</broker>

 

 

分享到:
评论
3 楼 QING____ 2018-04-02  
sheliey1210 写道
   

小淫贼。哈哈
2 楼 sheliey1210 2018-03-28  
   
1 楼 sheliey1210 2018-03-28  
哇,我深入了,我好快乐

相关推荐

    ActiveMQ消息过期时间设置和自动清除解决方案

    详细描述了ActiveMQ消息过期-时间设置和自动清除解决方案。

    ActiveMQ.rar

    消息选择器、消息重递策略、慢消费者处理等 n 十三:杂项技术 包括:监控和管理Broker、集成ActiveMQ和Tomcat、什么时候使用ActiveMQ等 n 十四: ActiveMQ优化 包括:影响ActiveMQ性能的因素、常见的优化方式和配置...

    activemq配置

    ......................... &lt;!-- 持久化策略 --&gt; &lt;!-- Oracle DataSource Sample Setup --&gt; &lt;/bean&gt;

    0924分布式消息通信-ActiveMQ1

    ACK_TYPE表示确认指令的类型,broker可以根据不同的ACK_TYPE去针对当前消息做不同的应对策略REDELIVERED_ACK_TYPE (brok

    java实现的消息中间件之AcitveMQ详解,学习学习

    KahaDB他是默认的持久化策略,所有消息都会顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息...

    ActiveMq由浅入深讲解+面试题50道讲解

    本课程共分36节,内容包括MQ概述和工作流程,启动过程与启动异常分析,消息的基本模型,基于...事务,死信队列,ACK策略,消息的丢失,重复重复消费,消息重发,springmvc集成,集群搭建,集群访问,50道面试精讲等。

    Java思维导图xmind文件+导出图片

    ActiveMQ消息确认及重发策略 ActiveMQ基于Spring完成分布式消息队列实战 Kafka Kafka基于Zookeeper搭建高可用集群实战 kafka消息处理过程剖析 Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举...

    javaBook-src:Java相关框架, java 基础, Spring, mybsatis ,SpringBoot ,分布式,微服务

    activemq spring 系列 bean加载 spring 常用注解 spring boot application 相关 spring rest 相关 spring mvc spring boot 邮箱,定时任务 spring webflux spring bot redis 相关 spring boot mysql 相关 spring ...

    polarizer-umb

    这是一个小型JMS / ActiveMQ帮助程序库,专门用于使用TLS证书进行身份验证和加密的ActiveMQ代理。 我可以用来做什么? polarizer-vertx项目在其UMB(统一消息总线)Verticle中使用了此功能。 通过将Message...

    springCloud

    微服务架构引入策略 – 对传统企业而言,开始时可以考虑引入部分合适的微服务架构原则对已有系统进行改造或新建微服务应用,逐步探索及积累微服务架构经验,而非全盘实施微服务架构。 更多关于微服务架构内容-请...

    亿可控- 物联网新零售项目跟立可得差不多 包含视频包含资料

    Java物联网企业级项目实战之亿可控(超完备功能 打造物联网设备监控)系统分析与设计+指标数 据采集+断连监控+数据持久化+5.GPS采集搜索与数据透传19套JAVA...Vue3,Mybatis-Plus,Oauth,Nacos,RabbitMQ,ActiveMQ

    JavaEE求职简历-姓名-JAVA开发工程师.docx

    1.熟悉Java编程基础,了解良好的代码编写规范,了解常用的设计模式,熟悉Java开发常用的API; 2.熟悉Java Web应用开发,...9.了解或使用过的企业级开发常用中间件与第三方库:Nginx、ActiveMQ、CXF、ThymenLeaf、Freemarke

    sbt-akka-microservice:基于 scala、sbt、spray 和 akka 的微服务开发原型

    微服务原型 A 具有 Scala、Akka、Spray 和 Camel/ActiveMQ ...SLF4J 日志记录用于使用自定义合并策略创建 JAR 文件的 Sbt 程序集插件接口结构 POST /api/example1/test (Example1Routes)GET /api/example1/done (Exampl

    大型分布式网站架构与实践

     如何使用分布式消息系统ActiveMQ来降低系统之间的耦合度,以及进行应用间的通信。  垂直化的搜索引擎在分布式系统中的使用,包括搜索引擎的基本原理、Lucene详细的使用介绍,以及基于Lucene的开源搜索引擎工具...

    开源bbs源码java-JavaStudy:Java全系列知识点都总结在这里了,欢迎大家前来学习,如果对你有所帮助,请不要忘记star一下给于

    ActiveMQ的使用 搜索引擎 非关系型数据库 分布式技术Dubbo、Zookeeper Linux与项目部署 设计模式 桥接模式 组合模式 外观模式 享元模式 模版方法模式 命令模式 迭代器模式 观察者模式 中介者模式 备忘录模式 解释器...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS Dubbo 工作能力 软实力 应急能力 创新能力 管理能力 分享能力 学习能力 沟通能力 ...

    Fourinone分布式并行计算四合一框架

     Fourinone整体代码短小精悍,跟Hadoop, Zookeeper, Memcache, ActiveMq等开源产品代码上没有任何相似性,不需要任何依赖,引用一个jar包就可以嵌入式使用,良好支持window环境,可以在一台机器上模拟分布式环境,...

    fourinone-3.04.25

    Fourinone整体代码短小精悍,跟Hadoop, Zookeeper, Memcache, ActiveMq等开源产品代码上没有任何相似性,不需要任何依赖,引用一个jar包就可以嵌入式使用,良好支持window环境,可以在一台机器上模拟分布式环境,更...

Global site tag (gtag.js) - Google Analytics