看这篇文章之前,我相信大家已经有过ActiveMQ的基本知识,已经知道JmsTemplete、MessageListenerContainer、Connection、Session、ConnectionFactory等相关知识,在后续的介绍里,还是会简单的进行介绍,以便更好的了解。
1、项目搭建
和SpringBoot整合之前,我们先导入相关依赖包,如下所示:
<!--activemq依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!--测试依赖包-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
application.properties相关配置:
#配置ActiveMq
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.password=admin
spring.activemq.user=admin
#队列名称
activemq.first.queue=FIRST_QUEUE
下面我们先实现生产者和消费者:
JmsTemplate是消息处理核心类(线程安全),被用作消息的发送和接受,在发送或者是消费消息的同时也会对所需资源进行创建和释放。消息消费采用receive方式(同步、阻塞的),这里不做具体描述。关于消息的发送,常用的方式为send()以及convertAndSend()两种,其中send()发送需要我们自己指定消息格式,使用convertAndSend可以根据定义好的MessageConvert自动转换消息格式。大家请注意,在自定义JmsTemplate时,一定要指定ConnectionFactory,否则会出错。
生产者,使用JmsTemplate.convertAndSend()实现消息的发送。
@Component
@Slf4j
public class FirstQueueProducer {
@Resource
private JmsTemplate jmsTemplate;
/**
* 用于发送消息到mq服务
* @param destination queue或者topic
* @param message 消息体
*/
public void send(String destination, String message) {
this.jmsTemplate.convertAndSend(destination,message);
log.info("发送消息成功,发送方式:{},发送内容:{}",destination,message);
}
}
消费者,这里使用@JmsListener监听队列消息,大家请注意,如果我们不在@JmsListener中指定containerFactory,那么将使用默认配置,默认配置中Session是开启了事物的,即使我们设置了手动Ack也是无效的。在后续配置containerFactory的时候会给大家提到这个地方的坑。
@Component
@Slf4j
public class FirstQueueConsumer {
@JmsListener(destination = "${activemq.first.queue}")
public void consumer(ActiveMQMessage message) throws JMSException {
log.info("接受队列消息,内容为:{}",message.getStringProperty("value"));
}
}
到目前为止,一个简单的生产和消费就实现了,大家可以测试一下,试试看。在测试的时候,如果不生效,请在启动类上添加@EnableJms。大家测试,或许应该发现了,当使用默认配置的时候,在不出现异常的情况下,消息会被成功消费,如果消费过程中出现异常,即使已经获取到消息也会消费失败,消息还是保留在消息队列中。这就是上述给大家说的,采用默认配置,Session是自动开启了事物的,如果消费成功(不出现任何异常的情况下)会提交事物,否则会回滚事物。
2、手动ACK
默认的配置已经能够满足大部分的业务,但是在某些情况下,我们可能需要手动确认消息的消费,这样我们就不得不修改默认的配置。在修改默认配置之前,我们先来了解一下ACK_MOD的几种类型:
int AUTO_ACKNOWLEDGE = 1;自动确认
int CLIENT_ACKNOWLEDGE = 2;客户端手动确认
int DUPS_OK_ACKNOWLEDGE = 3;批量自动确认
int SESSION_TRANSACTED = 0;事物提交确认
这几种是javax.jms.Session提供给客户端使用,如果我们想手动确认消息,那么肯定是需要CLIENT_ACKNOWLEDGE = 2这个值了,真的吗?其实即使我们把commit模式修改为2并且关闭事物,也不会起到任何作用,大家不妨试一试,源码如下:
//org.springframework.jms.listener.AbstractMessageListenerContainer
protected void commitIfNecessary(Session session, Message message) throws JMSException {
if (session.getTransacted()) {//是否开启事物
if (this.isSessionLocallyTransacted(session)) {
JmsUtils.commitIfNecessary(session);//进行事物提交
}
//判断消息不为空并且是否设置为CLIENT_ACKNOWLEDGE
} else if (message != null && this.isClientAcknowledge(session)) {
message.acknowledge();
}
}
//org.springframework.jms.support.JmsAccessor
protected boolean isClientAcknowledge(Session session) throws JMSException {
return (session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE);
}
从源码中可以发现,即使我们设置ACK_MOD为CLIENT_ACKNOWLEDGE,也是不起作用的,Spring在判断的时候,直接转化为自动提交了。难道就不能实现手动提交了吗?当然不是,ActiveMQ在jms的基础上新添加了一个模式:INDIVIDUAL_ACKNOWLEDGE = 4(单条消息确认)。使用该模式就能够满足我们的需要,下面简单介绍几种使用该模式的方式:
3、自定义JmsTemplate
最简单,最直接的方式就是自定义JmsTemplate,如下:
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory factory) {
JmsTemplate jmsTemplate = new JmsTemplate();
//关闭事物
jmsTemplate.setSessionTransacted(false);
//设置为单条消息确认
jmsTemplate.setSessionAcknowledgeMode(4);
jmsTemplate.setConnectionFactory(factory);
return jmsTemplate;
}
在消费消息时,直接使用该JmsTemplate.receive操作消息即可。较少使用。
4、使用DefaultMessageListenerContainer
DefaultMessageListenerContainer继承AbstractPollingMessageListenerContainer,采用consumer.receive来接受消息,支持XA transaction,receive方式是同步的,阻塞的,依赖多线程(taskExecutor)处理消息。
采用该方式手动Ack代码如下:
@Bean
public DefaultMessageListenerContainer defaultMessageListenerContainer(ConnectionFactory connectionFactory){
DefaultMessageListenerContainer defaultMessageListenerContainer=new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setSessionAcknowledgeMode(4);//设置单条消息确认
defaultMessageListenerContainer.setDestinationName("FIRST_QUEUE");//需要指定队列
defaultMessageListenerContainer.setSessionTransacted(false);//关闭事物,否则不生效
//消费消息
defaultMessageListenerContainer.setMessageListener((MessageListener) message -> {
try {
System.out.println("消费消息:"+message);
//手动确认消息,如若不确认,消息将一直存在于消息队列中
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
});
defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
return defaultMessageListenerContainer;
}
在验证该方式之前,需要把之前写的FirstQueueConsumer消费者注释掉,以免被它消费自动确认,影响结果。使用该方式需要明确指定destination和messageListener。也就是说有多少队列或主题就需要定义多少个这样的Bean。
5、使用SimpleJmsListenerContainerFactory
SimpleJmsListenerContainerFactory内部使用SimpleMessageListenerContainer,该容器使用简单,不支持外部事物,另外该方式支持持有多个MessageConsumer实例,并且它提供了两种方式处理消息:线程池(taskExecutor)和MessageConsumer.setMessageListener。源码如下:
protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
Destination destination = getDestination();
if (destination == null) {
destination = resolveDestinationName(session, getDestinationName());
}
MessageConsumer consumer = createConsumer(session, destination);
//当线程池不为空
if (this.taskExecutor != null) {
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
//使用线程池消费消息
taskExecutor.execute(new Runnable() {
@Override
public void run() {
processMessage(message, session);
}
});
}
});
}
else {
//使用MessageConsumer.setMessageListener()处理消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
processMessage(message, session);
}
});
}
return consumer;
}
在我们了解该容器之后,再来看看该容器如何实现手动Ack,代码如下:
@Bean
public SimpleJmsListenerContainerFactory firstFactory(ConnectionFactory connectionFactory) {
SimpleJmsListenerContainerFactory factory=new SimpleJmsListenerContainerFactory();
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(4);
factory.setConnectionFactory(connectionFactory);
return factory;
}
定义好容器之后,我们就需要在消费者上去使用它,如下:
@JmsListener(destination = "${activemq.first.queue}" ,containerFactory = "firstFactory")
大家在测试的时候,也可以在消费者方法里输出Session对象中的事物开启状态和Ack模式,以便验证是否成功,如下:
@JmsListener(destination = "${activemq.first.queue}" ,containerFactory = "firstFactory")
public void consumer(ActiveMQMessage message,Session session) throws Exception {
System.out.println(session.getAcknowledgeMode());
System.out.println(session.getTransacted());
log.info("接受队列消息,内容为:{}",message);
message.acknowledge();
}
@JmsListener中containerFactory可以指定容器工厂,Spring提供的容器工厂有两种: DefaultJmsListenerContainerFactory和SimpleJmsListenerContainerFactory。网上很多示例使用的都是DefaultJmsListenerContainerFactory,例如:
@Bean
public DefaultJmsListenerContainerFactory firstFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory = new DefaultJmsListenerContainerFactory();
defaultJmsListenerContainerFactory.setSessionTransacted(false);
defaultJmsListenerContainerFactory.setSessionAcknowledgeMode(4);
configurer.configure(defaultJmsListenerContainerFactory, connectionFactory);
return defaultJmsListenerContainerFactory;
}
但是此方法在我这里并没有生效,不知是否是我这里使用有误,如果大家可以实现,望不吝赐教。
6、总结
上述内容是我在SpringBoot与ActiveMQ整合实现消息手动确认时,查阅网上资料,根据个人感悟所得,其中若有不好或是不正确的地方,望大家不吝指教,共同进步。