`
QING____
  • 浏览: 2230735 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ中Producer特性详解

 
阅读更多

前言

    从本文中你可以得到如下几个问题的解答:

    1) 设定消息的时间戳有什么作用

    2) 消息的timeToLive与过期原理

    3) Producer同步、异步发送消息,以及调优

    4) 消息priority与顺序,以及其中的误解。

 

    Producer作为ActiveMQ的消息发送端,是开发工程师经常需要面对的,我们需要多了解一些Producer的特性,以便开发出更高效的应用。

 

    如下为典型的代码示例:

String brokerUrl = "tcp://localhost:61616";
String queueName = "test-queue";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
Connection connection = factory.createConnection();
//ActiveMQFactory.createConnection(brokerUrl);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
Destination queue = session.createQueue(queueName);  
MessageProducer producer = session.createProducer(queue);
for(int i=0; i<5; i++){
	producer.send(session.createTextMessage("Hello,ActiveMQ!!"));
}
//....
connection.close();

 

    一个Producer的创建过程中,会涉及到brokerUrl、destinationUri、Connection、session、ACK模式主要的几个概念。它们都有一些隐藏的“秘密”值得我们去探索。此外producer复杂发送消息,还有很多与消息有关的特性需要注意。

 

1. disableMessageID

    在发送消息时,是否禁用MessageID属性。JMS的标准中并没有约定消息的ID是由Producer创建还是由Broker创建,禁用MessageID在一定程度上会减少网络开支,事实上这个属性在ActiveMQ没有任何意义。无论任何时候,Producer总会在发送消息之前,为Message生成全局唯一的MessageID。(JMSMessageID,由producerId + “:” +  sequenceNumber组成)

 

2. disableMessageTimestamp

    是否禁用消息时间戳。时间戳用来表示消息被producer发送的时间(非broker接收的时间),通常此timestamp和TTL一起决定消息是否已经过期;消费者也可以根据此timestamp获知消息发送的时间,以及在selector中使用timestamp来过滤过期或者较旧的消息等。

 

    如果属性值为true,则在发送消息时会将消息的JMSTimestamp属性值设为为当前时间的时间戳;否则JMSTimestamp属性值将保持默认(0)。

 

  • 可以在brokerUrl中设定全局的默认值:  "tcp://localhost:61616?jms.disableTimeStampsByDefault=true"

  • messageProducer.setDisableMessageTimestamp(boolean)

    此选项只会影响到消息在producer端创建的时间,在ActiveMQ中,Message被额外的增加了2个属性,开发者可以获得更多关于消息流动的相关时间信息(ActiveMQMessage):

 

  • JMSActiveMQBrokerInTime: 表示消息被broker接收的时间戳

  • JMSActiveMQBrokerOutTime: 消息离开broker即将传送给consumer的时间戳。

3. defaultTimeToLive

    消息的存活时间。用来约束消息的过期时间,即JMSExpiration属性。注意timeToLive属性只会在DisableMessageTimestamp=false的情况下才有意义。

    JMSExpiration = timestamp + timeToLive。timeToLive默认值为0,即表示消息永不过期。只有当timeToLive值 > 0时,JMSExpiration属性才会被赋值。

 

  • messageProducer.setTimeToLive(long)

  • messageProducer.send(Message message, int deliveryMode, int priority, long timeToLive): 可以在消息发送时,为当前消息设定ttl。

    如果开发者使用TTL来判定消息的过期,那么就首先需要确保Producer、broker两者的系统时间要尽可能的一致,此外我们还需要Consumer也尽可能的和broker的时间保持一致。如果Produer的时间相对broker而言落后太多,有可能导致发送的消息直接被broker抛弃(broker接收消息后会检测是否过期,对于已经过期的消息则直接抛弃),消费者将无法获得相关消息。

 

    Broker将会在接收Producer消息时,以及将消息发送给Consumer之前都会检测消息是否过期,判断过期的方法也就是根据JMSExpiration和当前时间戳比较。如果消息过期,将会把消息发送到DLQ(Dead Letter Qeue)中,那么事实上此消息不会被Consumer消费;不过我们可以通过特殊的方式检测这些DLQ,比如侦听"ActiveMQ.DLQ"队列。由此可见,在使用TTL的时候,如果Producer与Broker的系统时间差距较大,将会导致消息“丢失”。

 

    不过,还有一种“可怕”情况,如果broker端对DLQ使用Discard策略或者Broker没有开启DLQ相关策略,这些过期的消息可能将不复存在。

 

  <destinationPolicy>
   <policyMap>
     <policyEntries>
       <policyEntry queue=">" topic=">">
         <deadLetterStrategy>
           <sharedDeadLetterStrategy processExpired="false" />
         </deadLetterStrategy>
		 <!-- discard all -->
		 <!--
		 <discardingDeadLetterStrategy />
		 -->
       </policyEntry>
     </policyEntries>
   </policyMap>
  </destinationPolicy>
 

 

4. alwaysSyncSend/useAsyncSend

    用来设定producer(或者session)发送消息的方式:同步/异步。

 

    alwaysSyncSend表示消息将总会被同步发送(request<->response),即消息通过底层transport链接发送之后,将会阻塞直到收到broker端的ProducerAck或者sentTimeout。同步发送,是一种担保能力强但发送效率较低的方式,如果消息发送失败,client端可以立即获得异常。如果alwaysSyncSend值为true,则producer将使用同步发送所有消息(同时忽略useAsyncSend选项)

 

    useAsyncSend表示消息可以被异步发送(oneway),异步发送并不是消息在client缓存起来,而是消息也将立即被transport发送给broker,只是不会阻塞等待broker的ACK。消息发送给broker之后,producer.send方法将会立即返回。异步发送,通常配合broker端“Flow Control”手段使用,它可以最大化produer端的发送效率。我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存,同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在useAsyncSend的情况下,客户端需要容忍消息丢失的可能。

    

//brokerUrl中指定异步/同步发送
jms.alwaysSyncSend=false&jms.useAsyncSend=true

    1) 当alwaysSyncSend=false时,对于“NON_PERSISTENT”(非持久化)消息将使用“异步发送”;对于非持久化消息,使用异步发送是最佳的选择,我们通常使用这种手段调优。

 

    2) 当alwaysSyncSend=false时,如果指定了useAsyncSend=true,都将采用异步发送,包括“PERSISTENT”类型的消息,也将使用异步发送。

    3) 当alwaysSyncSend=false时,无论如何,在事务中的消息,都将使用异步发送。

    4) 当alwaysSyncSend=true时,所有类型的消息,都将采用同步发送。

    5) 总结:默认情况下,非持久化消息、事务内的消息均采用异步发送;对于持久化消息,则采用同步发送

 

5. sendTimeout

    消息发送最大阻塞(等到ACK)时间。这个参数和syncSend方式配合使用,在同步发送方式中,send方法将会阻塞直到broker反馈ACK,不过这种阻塞的时长无法预期,我们需要使用sendTimeout来设定最大的阻塞时间,如果阻塞超时,将会抛出RequestTimedOutIOException。此值默认为0,表示永不超时。

 

  • 在brokerUrl中设定

//在brokerUrl中指定,单位:毫秒
jms.sendTimeout=30000
  • connection.setSendTimeout(long) 

    此外需要注意,如果指定了sendTimeout值 >0,无论何时,所有的消息都将同步发送。(忽略alwaysSyncSend、useAsyncSend、消息持久化方式等)。如果sendTimeout <0,则效果和=0一致。

 

6. socket优化

    如果我们的brokerUrl中,使用了TCP/UDP等transport,我们还可以直接对socket进行优化,这些参数需要再brorkerUrl中指定:

 

tcp://localhost:61616?socket.sendBufferSize=1024

 

7. windowSize(异步发送)

    窗口尺寸,用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的尺寸,且只对异步发送有意义。每次发送消息之后,都将会导致memoryUsage尺寸增加(+message.size),当broker返回producerAck时,memoryUsage尺寸减少(producerAck.size,此size表示先前发送消息的大小)。可以通过如下2种方式设置:

  • 在brokerUrl中设置: "tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的producer生效。

  • 在destinationUri中设置: "test-queue?producer.windowSize=1048576",此参数只会对使用此Destination实例的producer失效,将会覆盖brokerUrl中的producerWindowSize值。

     此值越大,意味着消耗Client端的内存就越大。

 

    至此我们已经清楚,任何发送到broker端的消息,broker总会在存储成功后回传ProducerAck信息,并且在ACK中包含消息的size。当producer发送消息时,会首先检测memoryUsage中是否有足够的空间(根据message.size判断),如果空间足够,则消息正常发送;否则将会阻塞,直到收到producerACK且memoryUsage空间释放足够多。(注意,对于持久化消息,只要broker存储消息成功即立即发送ProcuerAck)

 

    这是一个非常重要而且有效的调优参数之一;它在producer端消息发送比较密集时,或者producer发送速率与consumer消费速度失衡严重时,对维护broker实例性能稳定具有重要意义。比如,当producer短时间内大批量发送消息,而consumer消费速度相对较低时,会对broker端的存储造成巨大冲击,在极端情况下,可能导致OOM或者broker宕机。所以,我们通常开启此参数,且指定相对合理的值。

 

    仅仅在producer指定windowSize,可以在Client端避免这些问题,不过这只是一个方面;但是从全局来看,还不够全面,我们还需要在broker端使用“Flow Control”策略来避免上述情况的发生。不过对于"持久化"类型的消息,当broker存储成功之后,总是立即发送ack,所以我们不需要担心会带来潜在的危险。因为“windowSize”、“Flow Control”通常是针对“非持久化”消息。

 

    “Flow Control”通常称为“流量控制”,它的核心理念就是“按需交付”。如果每个Producer指定的windowSize为64M,但是我们有10个这样的producer,那么意味着broker端在最坏的情况下,需要承载640M数据,不过很不幸的是broker上可供cache的空间只有512M,那么此后broker可能因为内存不足而强制挂起Producer的物理连接(connection),从而导致在connection上无法继续发送任何消息。这会引入另外一个问题,当connection上还有当前通道(Queue或者Topic)的consumer时,会带来死锁;这导致consumer的ACK无法发送-->进而导致消息继续消费,以致于形成“消息无法消费”“也无法发送”的死锁情况。所以通常情况下,我们不能让Consumer和Producer共享一个物理Connection。

 

  “Flow Control”所能做的就是:当broker内存不足时,让producer暂停一下,直到消息消费之后腾出足够的空间。可以让消息密集发送时,broker能够协调Producer发送的“频率”,同时确保自己的性能处于可控范围。

 

<policyEntry queue=">" producerFlowControl="true" memoryLimit="32mb">  
  <!-- only for queue -->
  <!-- topic is very different from it -->
  <pendingQueuePolicy>
    <!-- non_persistent: use "vmQueueCursor",persistent: use "fileQueueCursor" -->
    <storeCursor/>
	<!-- also can do this -->
	<!--
	<storeCursor>
		<nonpersistent>
			<vmQueueCursor/>
		</nonpersistent>
	</storeCursor>
	-->
  </pendingQueuePolicy>
</policyEntry>

 

    上述配置描述的是,对于任意一个queue都使用32m内存保存非持久化消息,如果超过限制,将阻塞Producer链接。不过强制挂起Producer似乎有些过于粗暴,我们可以增加如下配置,当borker内存不足时,向producer发送一个异常,此时producer可以通过捕获异常来“暂停与重试(sleep一会再发送)”或者提交事务(如果事务中有大量消息且消息为持久化类型的)。

 

<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="512mb"/>
   </memoryUsage>
 </systemUsage>
</systemUsage>

 

8. Transaction

    producer端可以使用事务来实现多条消息的“原子性”。据上所述,事务中的消息,都会采用asyncSend方式发送,对于broker而言将会把当前producer事务中的消息cache在内存中,直到事务提交。 

 

//如果使用事务,其中transacted=true,ackMode = Auto_ACK,事实上ackMode对producer并无意义。
connection.createSession(boolean transacted, int acknowledgeMode)

 

    对于producer而言,事务并没有特别需要注意的地方,无论是持久化/非持久化,建议不要在一个事务中同时发送过多的消息,此外建议设定windowSize,即“事务” + “windowSize”是推荐的一种高效的方式。

 

    开发者需要再合适的时机,调用session.commit()方法来提交事务,不过需要注意session非线程安全,建议在事务类型下,每个producer独占一个session已防止多线程(多producer)提交的时机不同而造成混乱。 

 

9. Priority(权重)与消息传送顺序

    我们可以在发送消息时,指定消息的权重,broker可以建议权重较高的消息将会优先发送给Consumer。在某些场景下,我们通常希望权重较高的消息优先传送;不过因为各种原因,priority并不能决定消息传送的严格顺序(order)。

 

    JMS标准中约定priority可以为0~9的数值,值越大表示权重越高,默认值为4。不过activeMQ中各个存储器对priority的支持并非完全一样。比如JDBC存储器可以支持0~9,因为JDBC存储器可以基于priority对消息进行排序和索引化;但是对于kahadb/levelDB等这种基于日志文件的存储器而言,priority支持相对较弱,只能识别三种优先级(LOW: < 4,NORMAL: =4,HIGH: > 4)。在broker端,默认是不支持priority排序的,我们需要手动开启:

 

<policyEntry queue=">" prioritizedMessages="true"/>

 

    一旦开启了此属性,此后消息存储时,将会按照prioprity的倒序索引化消息(比如kahadb B+Tree,此后priority将作为索引的一部分)。此后broker从存储器中获取消息时,权重较高的消息将会被优先获取;对于JDBC等其他存储器,可能在获取消息时,按照priority作为排序列来筛选消息。

 

    我们首先简单的描述一下,消息在broker端pending的过程,这涉及到prefetch机制以及消息是否持久化等方面的问题。在broker端,为了优化Consumer消费的效率,通常开启prefetch策略,即从通道中(Queue/Topic)批量加载多条消息,这些消息可能来自内存(非持久化),也可能来自存储文件(持久化消息,或者非持久化消息被swap到临时文件中);borker会为每个Consumer创建一个基于内存的pending buffer,用来保存即将发送给Consumer的消息列表。当buffer中的消息被Consumer消费之后,将会从内存或者文件中继续加载多条消息,然后再根据需要将一定量的消息放入到pending buffer中。由此可见,我们只能保证每次prefetch的消息列表是按照priority排序的,但是有可能在buffer中的消息还没有发送之前,会有更高优先级的消息被写入文件或内存,事实上这已经不能改变消息发送的顺序了;因为我们无法在全局范围内,保证Consumer即将消费的消息是权重最高的!!

 

    不过对于“非持久化”类型的消息(如果没有被swap到临时文件),它们被保存在内存中,它们不存在从文件Paged in到内存的过程,因为可以保证优先级较高的消息,总是在prefetch的时候被优先获取,这也是“非持久化”消息可以担保消息发送顺序的优点。

 

     Broker在收到Producer的消息之后,将会把消息cache到内存,如果消息需要持久化,那么同时也会把消息写入文件;如果通道中Consumer的消费速度足够快(即积压的消息很少,尚未超过内存限制,我们通过上文能够知道,每个通道都可以有一定的内存用来cache消息),那么消息几乎不需要从存储文件中Paged In,直接就能从内存的cache中获取即可,这种情况下,priority可以担保“全局顺序”;不过,如果消费者滞后太多,cache已满,就会触发新接收的消息直接保存在磁盘中,那么此时,priority就没有那么有效了。

 

    在Queue中,prefetch的消息列表默认将会采用“轮询”的方式(roundRobin,注意并不是roundRobinDispatch)[备注:因为Queue不支持任何DispatchPolicy],依次添加到每个consumer的pending buffer中,比如有m1-m2-m3-m4四条消息,有C1-C2两个消费者,那么: m1->C1,m2->C2,m3->C1,m4->C2。这种轮序方式,会对基于权重的消息发送有些额外的影响,假如四条消息的权重都不同,但是(m1,m3)->C1,事实上m2的权重>m3,对于C1而言,它似乎丢失了“顺序性”。

 

//参见org.apache.activemq.broker.region.Queue
//doActualDispatch方法
//伪代码,描述

//pendingList: 从store中PageIn 的消息列表,亟待发送
Iterator it = pendingList.iterator();
//为了当前Page的消息发送,创建一个消费者列表的copy
//每个Queue都持有目前所有的消费者列表。
//在broker端,Subscription负责和Consumer客户端通讯
List<Subscription> copyConsumers = new ArrayList<Subscription>(this.allSubscriptions);
Set<Subscription> fullConsumers = new HashSet<Subscription>();
while(it.hasNext()){
    Message mx = it.next();
    for(Subscription consumer : copyConsumers){
		if(fullConsumers.contains(consumer)){
			break;//如果当前consumer亟待确认的消息已经超过了prefetch
		}
		//如果当前consumer中的pendingBuffer没有满
		if(!consumer.isFull()){
			it.remove();
			consumer.dispatch(mx);//异步发送消息,addBuffer
			if(!strictOrderDispatch){
				//如果不是严格顺序发送
				//对consumers按照权重排序
				//权重高的consumer,将会被有限填充buffer
				//当权重相同时,将会按照它们最后发送消息的ID,倒序排列
				//即消息ID较小的,将会优先获取新消息。
				Collections.sort(this.allSubscritpions);
				//重建copy
				copyConsumers = new ArrayList<Subscription>(this.allSubscriptions); 
			}
		}else{
			fullConsumers.add(consumer);
		}
			
}
if(!pendingList.isEmpty()) {
	//剩余的消息,将会被cache起来,继续轮训上述过程
}

 

 

    为了让priority的消息更加具备顺序性,我们可以通过如下配置来调整:

     

//queue or topic
<policyEntry queue=">" strictOrderDispatch="true" />

 

    strictOrderDispatch“严格顺序转发”,这是区别于“轮询”的一种消息转发手段;不过不要误解它为“全局严格顺序”,它只不过是将prefetch的消息依次填满每个consumer的pending buffer。比如上述例子中,如果C1-C2两个消费者的buffer尺寸为3,那么(m1,m2,m3)->C1,(m4)->C2;当C1填充完毕之后,才会填充C2。由此这种策略可以保证buffer中所有的消息都是“权重临近的”、有序的。(需要注意:strictOrderDispatch并非是解决priority消息顺序的问题而生,只是在使用priority时需要关注它)。

 

    对于Queue而言,仅仅使用strictOrderDispatch并不能完全解决顺序问题,它可能是相对高效但是比较粗略的方式;如果需要严格保证有序性,我们需要按照如下方式配置:

 

<policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1" />

 

    useCache=false来关闭内存,强制将所有的消息都立即写入文件(索引化,但是会降低消息的转发效率);queuePrefetch=1来约束每个consumer任何时刻只有一个消息正在处理,那些消息消费之后,将会从文件中重新获取,这大大增加了消息文件操作的次数,不过每次读取肯定都是priority最高的消息。[对于broker而言,如果指定了prioritizedMessage将在存储时就根据消息的权重建立索引顺序,在内存中使用PrioritizedPendingList保存消息;否则将使用OrderedPendingList]

 

    “strictOrderDispatch”也适用于Topic,broker可以保证所有Subscriber获得的消息的顺序是一致的。不过Topic支持DispatchPolicy,但是不支持"strictOrderDispatch"属性,这个与Queue有所不同。

 

<policyEntry topic=">">
	<dispatchPolicy>
		<strictOrderDispatchPolicy />
	</dispatchPolicy>
</policyEntry>

 

 

    此外,对于Consumer而言,我们需要关注一个参数:messagePrioritySupported;它表示consumer端是否支持权重排序,默认为true,即当Consumer客户端使用了prefetchSize时,将会对这些已经到达Session但还没有转发给Consumer的消息列表,按照权重排序。我们可以通过这种方式开启:

 

//brokerUrl中
jms.messagePrioritySupported=true

 

    如果你关闭了此选项,那么需要注意,当高权重的消息因为消费异常而重发时,将不能被优先消费。 

 

10. Producer消息传送模型

 

 

 

11. 如何提升Producer端消息发送效率?

    在某些场景下,我们的Producer的个数是非常有限的,可能只有几个,比如基于Queue的“订单接入网关”(生成订单原始信息并负责传递),但是相应的Consumer的个数相对较多,在整体上Producer效能小于Consumer。还有一些场景,Producer的数量非常,消息量也很大,但是Consumer的个数或者效能相对较低,比如“用户点击流”、“用户消息Push系统”等。

 

    究竟该如何提高消息发送的效率,参考上文,几个因素值得我们考虑:

    1) 消息持久化:持久化类型的消息,对broker端性能消耗远远大于非持久化类型;这归结于ActiveMQ本身对持久化消息确保“最终一致性”,持久化意味着“消息不丢失”(无论过期,还是DLQ),即当broker接收到消息后需要一次强制性磁盘同步(fsync)[备注:不过基于日志的存储器kahadb/levelDB提供延迟写入的特性,如果开启延迟写入,将会在broker物理失效时有丢失数据的潜在风险];对于Consumer在消费消息后,也会触发磁盘写入(通常为标记消息已消费,或者移除消息的相关索引信息;这个过程通常是延迟写入);此外,通常broker端还会开启相关的“过期消息检测”线程,将存储器中的数据载入内存并检测,这个过程也是内存、磁盘IO消耗的。由此可见,持久化类型的消息从始至终,都在“拖累”着系统的性能和吞吐能力。

 

     这就要求,开发者根据实际需要定夺消息的传输模式(持久化、非持久化),对于数据可靠性要求较低,容忍数据在极端情况下丢失的场景中,我们需要果断的使用NON_PERSISTENT。

 

    2) 消息属性: 通过Producer发送的消息(Message)中,除了消息本身的负荷体之外(content),还有大量的JMS属性和Properties可以设置,比如timestamp、priority等。因为JMS中,支持对JMS属性和properties使用selector,那么这些内容将会加大和复杂化message header,我们尽可能的在properties中携带更少、更小的数据。此外,我们还不能通过Message传递较大的文本、流数据,尽管activeMQ支持这些特性,但是它会对broker带来更多消息存储、控制成本,事实上,较大数据的传递,使用activeMQ是不理智的。

    此外,我们需要慎重的使用Priority,这会对底层的存储器带来额外的性能开支。

 

    3) 异步发送:如果消息是非持久化的,或者Session是基于事务的,建议开发者不要关闭异步发送;这是提升Producer发送效率的重要的策略。设定合适的windowSize,开启Broker端“Flow Control”等,这既可以提高Produer发送效率,还能避免因Producer数据过大造成Broker不稳定的问题。

 

    4) 事务:对于Producer而言,使用事务并不会消耗Broker太多的性能,主要是会占用内存,所有未提交的事务消息,都会保存在内存中,有些基于日志的存储存储器,事务类型的持久化消息暂存在额外的文件中,直到日志提交或者回滚后清除。所以,Producer端不要在事务中,积压太多的消息,尽可能早的提交事务。

 

    5) 提升Consumer消费速率,无论是Queue还是Topic,快速的Consumer,无疑是提升整体效能的最好的手段,开发者需要在Consumer个数和消费耗时等多个方面权衡,尽可能通过良好的架构设计,让Consumer的消费速率和Producer的生产速率保持同步。

 

    6) 选择合适的存储器: activeMQ目前支持JDBC/kahadb/LevelDB三种主要的存储器。JDBC主要面向基于RDBMS方向,通常如果消息不仅面向ActiveMQ,还可能被用于第三方平台的操作,JDBC的特点就是透明度高,可扩展方案较多(但扩展成本较高)。kahadb和LevelDB,同属于日志存储 + BTree索引,性能很好,对于消息较多(单位尺寸较小),消费速度较快的应用,是最好的选择,这两种存储器也是最常用的,其中LevelDB是被推荐使用的。

 

12. ActiveMQ Producer与Spring代码样例

    1) MessageProducerClient.java

public class MessageProducerClient {
	
    private ActiveMQConnectionFactory connectionFactory;
    
    private String destination;
    
    private MessageProducer producer;
    
    private ActiveMQConnection connection;
    
    private Session session;
    
    private int deliveryMode = DeliveryMode.PERSISTENT;
    
    public String getDestination() {
		return destination;
	}


	public void setDestination(String destination) {
		this.destination = destination;
	}

	public void setDeliveryMode(int deliveryMode) {
		this.deliveryMode = deliveryMode;
	}


	public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
		this.connectionFactory = connectionFactory;
	}
	
	public void init() throws Exception{
		connection = (ActiveMQConnection)connectionFactory.createConnection();
		connection.start();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination physical = ActiveMQDestination.createDestination(destination, ActiveMQDestination.QUEUE_TYPE);
		producer = session.createProducer(physical);
		producer.setDeliveryMode(deliveryMode);
	}

    public void send(String text) throws Exception{
    	TextMessage message = session.createTextMessage(text);
    	producer.send(message);
    }
    
    public void send(Message message) throws Exception{
    	producer.send(message);
    }
    
    
    public void close(){
    	try{
    		connection.close();
    	}catch(Exception e){
    		e.printStackTrace();
    	}
    }
}

    2) spring配置

<bean id="activeMQConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
	<property name="brokerURL" value="tcp://localhost:61616"/>
	<property name="userName" value=""/>
	<property name="password" value=""/>
	<property name="sendTimeout" value="3000" />
</bean>

<bean id="orderMessageProducerClient" class="com.test.common.MessageProducerClient" init-method="init" destroy-method="close">
	<property name="connectionFactory" ref="activeMQConnectionFactory"></property>
	<property name="destination" value="queue://order.queue"></property>
</bean>

  

 

 

结语:

    我们从大体上剖析了和Producer、Message有关的几个重要属性,开发者需要再合适的场景中使用它们。如果本文有错误之处,敬请各位不吝赐教,批评斧正。谢谢!

    有关消息权重部分,[请参阅]

    1. ActiveMQMessageProducer

    2. TransportConnection

    3. Broker,BrokerService,PolicyEntry

    4. TransactionBroker,TransactionStore

    5. PrioritizedPendingList

    

  • 大小: 54.9 KB
  • 大小: 59.2 KB
分享到:
评论
22 楼 QING____ 2015-06-26  
csuzm0613 写道
问下博主,最近在使用spring集成activemq5.5时,发现在consumer断,确认模式设置成Client_Acknowledge时,MessageListener的onMessage()方法执行完成时消息被确认。而设置为Auto_Acknowledge时,一进入onMessage方法,消息就被确认了。这个和网上文章说的貌似不太一样,问下博主有没有遇到这种情况?

具体参见:http://shift-alt-ctrl.iteye.com/blog/2035321,文章的最下面有一些简单的介绍。
spring集成ActiveMQ之后,其实已经剥夺了开发者ACK消息的“权利”,如果开发者希望自己决定是否ACK,那么需要将模式设置为Client_ACK,而且当不希望消息被确认时,手动在onMessage方法中抛出异常来阻止spring确认消息。
21 楼 csuzm0613 2015-06-25  
问下博主,最近在使用spring集成activemq5.5时,发现在consumer断,确认模式设置成Client_Acknowledge时,MessageListener的onMessage()方法执行完成时消息被确认。而设置为Auto_Acknowledge时,一进入onMessage方法,消息就被确认了。这个和网上文章说的貌似不太一样,问下博主有没有遇到这种情况?
20 楼 QING____ 2014-04-25  
wabe033 写道
楼主,你好,求指点:

我想配置一个client id 发送队列中最多只有一条消息,以时间先后为顺序,后来的消息加入队列,先前的消息踢出队列,应该怎么配置呢?


你的问题,就涉及到了activeMQ消息的"剔除策略",我们可以通过配置,来决定队列中最多保留的旧数据的个数,不过这种策略只能对Topic有效,配置方式,参见如下:
<policyEntry topic="uniqueMessageTopic">
<messageEvictionStrategy>
<uniquePropertyMessageEvictionStrategy propertyName="uniqueProperty" />
</messageEvictionStrategy>
</policyEntry>
你需要指定propertyName,这样可以保证在Topic中,此属性值一样的消息,只会保留最新的一条.
19 楼 wabe033 2014-04-25  
楼主,你好,求指点:

我想配置一个client id 发送队列中最多只有一条消息,以时间先后为顺序,后来的消息加入队列,先前的消息踢出队列,应该怎么配置呢?
18 楼 xiaohu0901 2014-04-04  
QING____ 写道
xiaohu0901 写道
QING____ 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值


亲,消息首先写入内存,如果内存满了,则写入临时文件,当内存中的旧消息被消费后,则临时文件中的消息会被pageIn到内存;如果消费者很慢,那么内存将永远是满的,直到临时文件中的消息被消费完毕。


请你检查一下,broker上的配置,persistent属性是否为true:
<broker persistent="true">...

persistent我配置的非持久化<broker persistent="false">,使用fileCursor,当有消费者消费慢,按你说的它可以写入临时文件,我们现在的现象时不写临时文件,producer被阻塞不发消息

hi,如果broker的persistent=false,那么意味着对于“非持久化”消息的pending部分,将不会写入临时文件,而是直接存储在VM内存中,将在内存中构建一个排序链表来存储这些消息。建议你参考StoreQueueCursor源码。


是否可总结一句话,生产者和消费者速率最好完美一致,不然都存在一定问题!
17 楼 QING____ 2014-04-04  
xiaohu0901 写道
QING____ 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值


亲,消息首先写入内存,如果内存满了,则写入临时文件,当内存中的旧消息被消费后,则临时文件中的消息会被pageIn到内存;如果消费者很慢,那么内存将永远是满的,直到临时文件中的消息被消费完毕。


请你检查一下,broker上的配置,persistent属性是否为true:
<broker persistent="true">...

persistent我配置的非持久化<broker persistent="false">,使用fileCursor,当有消费者消费慢,按你说的它可以写入临时文件,我们现在的现象时不写临时文件,producer被阻塞不发消息

hi,如果broker的persistent=false,那么意味着对于“非持久化”消息的pending部分,将不会写入临时文件,而是直接存储在VM内存中,将在内存中构建一个排序链表来存储这些消息。建议你参考StoreQueueCursor源码。
16 楼 xiaohu0901 2014-04-03  
QING____ 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值


亲,消息首先写入内存,如果内存满了,则写入临时文件,当内存中的旧消息被消费后,则临时文件中的消息会被pageIn到内存;如果消费者很慢,那么内存将永远是满的,直到临时文件中的消息被消费完毕。


请你检查一下,broker上的配置,persistent属性是否为true:
<broker persistent="true">...

persistent我配置的非持久化<broker persistent="false">,使用fileCursor,当有消费者消费慢,按你说的它可以写入临时文件,我们现在的现象时不写临时文件,producer被阻塞不发消息
15 楼 QING____ 2014-04-03  
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值


亲,消息首先写入内存,如果内存满了,则写入临时文件,当内存中的旧消息被消费后,则临时文件中的消息会被pageIn到内存;如果消费者很慢,那么内存将永远是满的,直到临时文件中的消息被消费完毕。


请你检查一下,broker上的配置,persistent属性是否为true:
<broker persistent="true">...
14 楼 QING____ 2014-04-03  
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值


亲,消息首先写入内存,如果内存满了,则写入临时文件,当内存中的旧消息被消费后,则临时文件中的消息会被pageIn到内存;如果消费者很慢,那么内存将永远是满的,直到临时文件中的消息被消费完毕。
13 楼 xiaohu0901 2014-04-02  
QING____ 写道
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。

写入临时文件也不起效呢,现在memory使用到100%,broker log都是提示某个消费者使用memory 达到设置的值
12 楼 QING____ 2014-04-02  
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧


同步发送,就是传说中的request<->response,就是发送消息之后,producer阻塞直到broker端返回ack信息,所以这种情况下不需要windowsSize。异步发送,就是oneway,发送消息之后,producer直接从调用过程中返回,且将本地window的消耗 += message.size,当producer底层接收到ack之后,将window消耗 -= message.size,如果producer在发送时,发现windowSize已经满载,则阻塞直到接收到ack。

我觉得异步实现方式的时候,它其实没必要再去+message.size,因为它已经发出去了


呵呵,原因很简单,broker的吞吐能力有限,producer不能无限制的发送消息,我们需要在Client端“自觉”的让Producer慢下来。

这种方式我觉得broker提供限流配置会比较好点,超过处理能力的告知客户端错误,现在这种容易把client和broker都拖死!


是的,通常情况下,我们会将“windowSize” + “broker端Flow Control”配合使用。
11 楼 xiaohu0901 2014-04-02  
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧


同步发送,就是传说中的request<->response,就是发送消息之后,producer阻塞直到broker端返回ack信息,所以这种情况下不需要windowsSize。异步发送,就是oneway,发送消息之后,producer直接从调用过程中返回,且将本地window的消耗 += message.size,当producer底层接收到ack之后,将window消耗 -= message.size,如果producer在发送时,发现windowSize已经满载,则阻塞直到接收到ack。

我觉得异步实现方式的时候,它其实没必要再去+message.size,因为它已经发出去了


呵呵,原因很简单,broker的吞吐能力有限,producer不能无限制的发送消息,我们需要在Client端“自觉”的让Producer慢下来。

这种方式我觉得broker提供限流配置会比较好点,超过处理能力的告知客户端错误,现在这种容易把client和broker都拖死!
10 楼 QING____ 2014-04-02  
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧


同步发送,就是传说中的request<->response,就是发送消息之后,producer阻塞直到broker端返回ack信息,所以这种情况下不需要windowsSize。异步发送,就是oneway,发送消息之后,producer直接从调用过程中返回,且将本地window的消耗 += message.size,当producer底层接收到ack之后,将window消耗 -= message.size,如果producer在发送时,发现windowSize已经满载,则阻塞直到接收到ack。

我觉得异步实现方式的时候,它其实没必要再去+message.size,因为它已经发出去了


呵呵,原因很简单,broker的吞吐能力有限,producer不能无限制的发送消息,我们需要在Client端“自觉”的让Producer慢下来。
9 楼 xiaohu0901 2014-04-02  
QING____ 写道
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧


同步发送,就是传说中的request<->response,就是发送消息之后,producer阻塞直到broker端返回ack信息,所以这种情况下不需要windowsSize。异步发送,就是oneway,发送消息之后,producer直接从调用过程中返回,且将本地window的消耗 += message.size,当producer底层接收到ack之后,将window消耗 -= message.size,如果producer在发送时,发现windowSize已经满载,则阻塞直到接收到ack。

我觉得异步实现方式的时候,它其实没必要再去+message.size,因为它已经发出去了
8 楼 QING____ 2014-04-02  
xiaohu0901 写道
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧


同步发送,就是传说中的request<->response,就是发送消息之后,producer阻塞直到broker端返回ack信息,所以这种情况下不需要windowsSize。异步发送,就是oneway,发送消息之后,producer直接从调用过程中返回,且将本地window的消耗 += message.size,当producer底层接收到ack之后,将window消耗 -= message.size,如果producer在发送时,发现windowSize已经满载,则阻塞直到接收到ack。
7 楼 xiaohu0901 2014-04-02  
QING____ 写道
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。

这种是不是只针对同步发送方式,异步的不等broker响应,应该不需要这个windowsize吧
6 楼 QING____ 2014-04-01  
xiaohu0901 写道
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??

其实这个问题是有个前提的,如果消息密集,那就意味着将有更多的消息需要在producer client端的window中buffer起来,直到broker接收成功。这里也要求开发者,根据自己的实际情况指定合适的windowSize。这个windowSize就是内存消耗。
5 楼 xiaohu0901 2014-04-01  
我们通常在消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,就是需要消耗较多的Client端内存

=================================================================================
这个地方为什么异步会增加client端内存??
4 楼 QING____ 2014-04-01  
xiaohu0901 写道
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

在本人的博文:http://shift-alt-ctrl.iteye.com/blog/2035321,提到了“慢速消费者”的问题,以及如何解决“非持久化”情况下内存过载时的问题。【提示:写入临时文件】

如果,还有疑问,请继续。谢谢。
3 楼 xiaohu0901 2014-04-01  
不知道是我理解没到位还是什么原因,配置了memoryUsage的这个参数,我理解是当使用的memory达到设定值,它就会自动释放。实际情况不是这样的,现在只要cosumer的消费慢,如果发送的量大,很快memory使用就到100了,broker就僵死。目前使用非持久化,且配置producerflowcontroll=true

相关推荐

Global site tag (gtag.js) - Google Analytics