`
QING____
  • 浏览: 2232500 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ编程实例

    博客分类:
  • JMS
 
阅读更多

    本文主要展示如何使用activeMQ进行程序设计,可以作为代码实例参考;此后会继续补充有关JMS 或ActiveMQ的优化和架构部分。

    本实例主要展示如何使用Queue。

 

一.pom.xml

 

<dependencies>
    	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>commons-lang</groupId>
		<artifactId>commons-lang</artifactId>
		<version>2.4</version>
	</dependency>
	<dependency>
		<groupId>javax.jms</groupId>
		<artifactId>jms</artifactId>
		<version>1.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.qpid</groupId>
		<artifactId>proton-jms</artifactId>
		<version>0.3</version>
	</dependency>
           
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.8.0</version>
	</dependency>
</dependencies>
<build>
	<finalName>test-jms-1.0</finalName>
	<resources>
	    <resource>
		<directory>src/main/resources</directory>
		<filtering>true</filtering>
	    </resource>
	</resources>
</build>
    其中“proton-jms”是需要声明的,否则可能无法正常运行。

 

 

二.Java实例(非spring环境)

    对JMS的程序部分,推荐使用JNDI + 异步listener方式;所以接下来的例子将采取此方式。

    1) jndi.properties----[/src/main/resources/jndi.properties]

 

###contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
###brokerUrl,any protocol
java.naming.provider.url = tcp://localhost:61616
##username
##java.naming.security.principal=
##password
##java.naming.security.credentials=
##connectionFactory,for building sessions
connectionFactoryNames = QueueCF,TopicCF
##topic.<topicName> = <physicalName-of-topic>
##your application should use <topicName>,such as:
## context.lookup("topic1");
##It can be more than once
topic.topic1 = jms.topic1
##queue.<topicName> = <physicalName-of-queue>
queue.queue1 = jms.queue1
   
    2) QueueMessageListener.java(消息异步监听器)

 

public class QueueMessageListener implements MessageListener{

	public void onMessage(Message message) {
		if(message == null){
			return;
		}
		try{
			if(message instanceof TextMessage){
				String text = ((TextMessage)message).getText();
				System.out.println("-----JMS Message header-----");
				//message的关联id,可以在发送消息时指定,用于描述消息的关联性
				System.out.println("CorrelationID :" + message.getJMSCorrelationID());
				//消息的“传送模式”,1:非持久,2:持久
				System.out.println("DeliveryMode :" + message.getJMSDeliveryMode());
				//消息的过期时间,毫秒数;如果在发送消息时指定了timeToLive,此值为timestap + timeToLive
				System.out.println("Expiration :" + message.getJMSExpiration());
				//消息ID,全局唯一
				System.out.println("MessageID :" + message.getJMSMessageID());
				//消息权重,参考属性
				System.out.println("Priority :" + message.getJMSPriority());
				//是否为“重发”;当一个消息发送给消费者之后,未能收到“确认”,将会导致消息重发
				System.out.println("Redelivered :" +message.getJMSRedelivered());
				//消息创建的时间戳,当消息发送时被赋值。
				System.out.println("Timestamp :" + message.getJMSTimestamp());
				//消息的类型
				System.out.println("Type :" + message.getJMSType());
				System.out.println("-----Message Properties-----");
				Enumeration<String> names = message.getPropertyNames();
				if(names != null){
					while(names.hasMoreElements()){
						String key = names.nextElement();
						System.out.println(key + ":" + message.getStringProperty(key));
					}
				}
				System.out.println(">>>>" + text);
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
  
    3) SimpleConsumer.java(消息消费者)

     因为我们已经使用了“MessageListener”来异步接受消息,事实上JMS的实现中已经开启了单独的线程用来从网络中接受消息,并逐次调用onMessage方法;此处我们没有必要再次额外的创建线程。

 

public class SimpleConsumer {

	private Connection connection;
	private Session session;
	private MessageConsumer consumer;
	
	private boolean isStarted;
	
	public SimpleConsumer(MessageListener listener) throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		consumer = session.createConsumer(queue);
		consumer.setMessageListener(listener);
	}
	
	
	public synchronized boolean start(){
		if(isStarted){
			return true;
		}
		try{
			connection.start();//very important
			isStarted = true;
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		isStarted = false;
		try{
			session.close();
			connection.close();
		}catch(Exception e){
			//
		}
	}
}
  
    4) SimpleProductor.java(消息生产者)

 

public class SimpleProductor {

	private MessageProducer producer;
	private Session session;
	private Connection connection;
	private boolean isOpen = true;
	
	public SimpleProductor() throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		producer = session.createProducer(queue);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		
	}
	
	
	public boolean send(String message) {
		if(!isOpen){
			throw new RuntimeException("session has been closed!");
		}
		try{
			producer.send(session.createTextMessage(message));
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		try{
			if(isOpen){
				isOpen = false;
			}
			session.close();
			connection.close();
		}catch (Exception e) {
			//
		}
	}
	
}

 

    上面的程序,基本上可以完成简单的消息发送和接受,此外,还有一种不常用的方式:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(queue);
//MessageConsumer consumer = session.createConsumer(queue)
//同步接收消息
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
while(true){
	Message message = consumer.receive(10000);
	if(message == null){
		continue;
	}
	if(message instanceof TextMessage){
		//...
	}
}

 

    5) 测试方法:

SimpleProductor productor = new SimpleProductor();
productor.start();
for(int i=0; i<10; i++){
	productor.send("message content:" + i);
}
productor.close();
SimpleConsumer consumer = new SimpleConsumer(new QueueMessageListener());
consumer.start();

//consumer.close();

 

三.Spring-jms实例

 

    1) 配置文件:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
	<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
	<constructor-arg index="0" value="queue1"></constructor-arg>
</bean>
<!-- productor -->
<bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="defaultDestination" ref="queueDestination"></property>
	<!-- 是否在message中开启timestamp属性 -->
	<property name="messageTimestampEnabled" value="true"></property>
	<!-- 是否开启deliveryMode,priority,timeToLive消息附属属性 ,否则上述3个属性将采用默认值-->
	<property name="explicitQosEnabled" value="true"></property>
	<!-- NON_PERSISTENT = 1,PERSISTENT = 2,默认值为2-->
	<property name="deliveryMode" value="2"></property>
	<!-- pubSubNoLocal,对于topic而言,还需要注意此选项:是否接受本地消息,当消费者和生产者公用一个connection时 -->
</bean>
<bean id="productor" class="com.test.jms.spring.impl.ProductorImpl">
	<property name="jmsTemplate" ref="queueTemplate"></property>
</bean>
<!-- MDB -->
<bean id="queueMessageListener" class="com.test.jms.object.QueueMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="destination" ref="queueDestination"></property>
	<property name="messageListener" ref="queueMessageListener"></property>
	<!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 
	<property name="taskExecutor" ref="queueMessageExecutor"></property>
	-->
	 <!--  -->
	<property name="concurrentConsumers" value="1"></property>
	<!-- [concurrentConsumers]-[maxConcurrentConsumers] -->
	<!--  
	<property name="concurrency" value="1-5"></property>
	-->
	
</bean>
<bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="2" />
	<property name="maxPoolSize" value="5" />
	<property name="daemon" value="true" />
	<property name="keepAliveSeconds" value="120" />
</bean>

    我们采用了spring推荐的方式:消息生产者基于jmsTemplate,消息消费者基于MDB(pojo的消息驱动bean);为了提升消息的接收者的吞吐能力,我们可以采取多线程的方式,其中有几个重要的参考配置:

    A) taskExecutor:消息接受端使用线程池的方式处理消息;当消息的网络输入速率大于消息的处理速率时,可以考虑采用此方式;在消息消费者的与JMS-Server的网络链接中,每收到一条消息,将会立即交付给线程池中的线程去执行,执行时仍然调用messageListener的方法;此处需要注意,线程池中的所有线程仍然共享一个messageListener实例,在采用线程池模式中,请注意线程安全问题。

    B) concurrentConsumers:并发运行的消费者个数,在默认情况下为一个“消息消费者”;事实上,一个consumer即为一个Session,多个consumer即为多个Session;不过它们底层仍然共享一个“tcp链接”;此配置项仍然是适用于“消息的网络输入速率大于消息的处理速率”的场景;每一个consumer都将会在单独的线程中运行,但是它们仍然共享一个messageListener实例;在此场景下,你无法绝对的保证,原本“有序”的消息在交付给多个consumer时被实际执行的顺序也是严格的。

    taskExecutor是一种额外的优化策略,concurrentConsumers则是采用了JMS原生的特性;在实际场景中,我们选择一种即可。如下为Spring-JMS是如何使用线程池处理消息的原理(基于封装的思想):

if (this.taskExecutor != null) {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(final Message message) {
			taskExecutor.execute(new Runnable() {
				public void run() {
					processMessage(message, session);
				}
			});
		}
	});
}
else {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			processMessage(message, session);
		}
	});
}

 

    2) ProductorImpl.java

public class ProductorImpl implements Productor {

	private JmsTemplate jmsTemplate;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void send(final String message) {
		if(message == null){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}

	public void send(final Map<String, String> message) {
		if(message == null || message.isEmpty()){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage mm = session.createMapMessage();
				for(Entry<String, String> entry : message.entrySet()){
					mm.setString(entry.getKey(), entry.getValue());
				}
				return mm;
			}
		});
		
	}

	
}

    非常的简单,我们不用书写和consumer有关的代码。一切就结束了。[备注,本文中的实例命名为Productor,只是为了避免和JMS中producer混淆]

分享到:
评论

相关推荐

    基于Spring+JMS+ActiveMQ+Tomcat整合

    基于Spring+JMS+ActiveMQ+Tomcat,做一个Spring4.1.0和ActiveMQ5.11.1整合实例,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

    JAVA上百实例源码以及开源项目源代码

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    JAVA上百实例源码以及开源项目

     Java数据压缩与传输实例,可以学习一下实例化套按字、得到文件输入流、压缩输入流、文件输出流、实例化缓冲区、写入数据到文件、关闭输入流、关闭套接字关闭输出流、输出错误信息等Java编程小技巧。 Java数组倒置...

    java开源包1

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包2

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包3

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包6

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包5

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包10

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包8

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包7

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包9

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包101

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包11

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    java开源包4

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    Java资源包01

    Spring4GWT GWT Spring 使得在 Spring 框架下构造 GWT 应用变得很简单,提供一个易于理解的依赖注入和RPC机制。 Java扫雷游戏 JVMine JVMine用Applets开发的扫雷游戏,可在线玩。 public class JVMine ...activemq...

    C#实现同Active MQ通讯的方法

    本文实例讲述了C#实现同Active MQ通讯的方法。分享给大家供大家参考,具体如下: 内容概要: 主要以源码的形式介绍如何用C#实现同Active MQ 的通讯。本文假设你已经正确安装JDK1.6.x,了解Active MQ并有一定的编程...

Global site tag (gtag.js) - Google Analytics