前面讲到消费者通过receive()方法来接收消息,它是通过同步阻塞的方式来获取消息的,此时程序一直是阻塞的。那么有没有其他方法获取到消息呢?答案是有的,那就是使用MessageListener。
package com.wsy.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.0.101:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
// 创建连接工厂,按照给定的url地址采用默认的用户名和密码
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 通过连接工厂,获取Connection并启动
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 创建Session
// 有两个参数,第一个是事务,第二个是签收,后面详细介绍
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地(目的地有两个子接口,分别是Queue和Topic)
Queue queue = session.createQueue(QUEUE_NAME);
// 创建消费者,指明从queue取消息
MessageConsumer messageConsumer = session.createConsumer(queue);
// 通过监听器的方式来消费消息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者消费:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();// 用于阻塞住线程,让消费者消费完成才能关闭
// 按照资源打开的相反顺序关闭资源
messageConsumer.close();
session.close();
connection.close();
}
}
注意最后这里的System.in.read();这条语句,如果不写这条语句,消费者在处理消息的时候,可能还没有处理完,就被后面的关闭资源给关闭了。当然,这里也可以换成Thread.sleep();的形式,设置一个合理的时间。
如果资源关闭的过快,就会出现这个情况,只消费了部分消息,资源就被关闭了,如下图所示。
|