启动类添加注解:@EnableJms
1、pom
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>1.5.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
2、application.properties
activemq.url=tcp://127.0.0.1:61616
activemq.user=admin
activemq.password=admin
3、初始化对象配置
@Configuration
public class BeanConfig {
@Value("${activemq.url}")
private String ActiveMQ_URL;
@Value("${activemq.user}")
private String ActiveMQ_USER;
@Value("${activemq.password}")
private String ActiveMQ_PASSWORD;
/**
@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(ActiveMQ_URL);
connectionFactory.setUserName(ActiveMQ_USER);
connectionFactory.setPassword(ActiveMQ_PASSWORD);
return connectionFactory;
} */
/**
* 参考介绍:https://www.cnblogs.com/xingzc/p/5943165.html
* @return 解决activeMq 一直创建连接的问题
*/
@Bean("connectionFactory")
public CachingConnectionFactory connectionFactory(){
ActiveMQConnectionFactory amqFactory = new ActiveMQConnectionFactory(ActiveMQ_USER, ActiveMQ_PASSWORD, ActiveMQ_URL);
CachingConnectionFactory ccf = new CachingConnectionFactory(amqFactory);
ccf.setSessionCacheSize(50);
ccf.setCacheProducers(true);
return ccf;
}
@Bean
public ConvertJmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
ConvertJmsTemplate jmsTemplate = new ConvertJmsTemplate();
jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setConnectionFactory(connectionFactory);
jmsTemplate.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);//客户端签收模式
return jmsTemplate;
}
@Bean
public JmsMessagingTemplate jmsMessageTemplate(ConnectionFactory connectionFactory) {
return new JmsMessagingTemplate(connectionFactory);
}
// 自动设置JMS重连JmsListenerContainerFactory
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory jmsListenerFactory = new DefaultJmsListenerContainerFactory();
jmsListenerFactory.setPubSubDomain(false);
jmsListenerFactory.setConnectionFactory(connectionFactory);
return jmsListenerFactory;
}
@Bean
public Session createConnSession(ConnectionFactory connectionFactory) throws JMSException {
Connection conn = connectionFactory.createConnection();
Session session = (ActiveMQSession)conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
return session;
}
-
ConvertJmsTemplate.java
-
解决生产者producer多次重连断开的问题
import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.JmsUtils;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Objects;
public class ConvertJmsTemplate extends JmsTemplate {
private Logger logger = LoggerFactory.getLogger(ConvertJmsTemplate.class);
@Autowired
private Session session;
private MessageProducer producer;
public void sendQueue(ActiveMQQueue queue, final Object message) {
try {
if (Objects.isNull(producer)) {
producer = super.createProducer(session, queue);
}
doSend(producer, super.getMessageConverter().toMessage(message, session));
// Check commit - avoid commit call within a JTA transaction.
if (session.getTransacted() && isSessionLocallyTransacted(session)) {
// Transacted session created by this template -> commit.
JmsUtils.commitIfNecessary(session);
}
} catch (Exception e) {
logger.error("error {}", e);
}
}
}
4、生产者与消费者
@JmsListener:注解默认支持自动重连机制
- 消费者
@Component
public class Consumer {
/*
* 监听和读取消息
*/
@JmsListener(destination = "active.topic")
public void readActiveTopic(String message) {
System.out.println("接受到2:" + message);
}
}
- 生产者
@RestController
public class ProducerController {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
/*
* 消息生产者
*/
@RequestMapping("/sendmsg")
public void sendmsg(String msg) {
// 指定消息发送的目的地及内容
this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
}
}
5、扩展
5.1、自定义动态监听器#
1、 ConnectionDefaultListener;
public class ConnectionDefaultListener {
private Logger logger = LoggerFactory.getLogger(ConnectionDefaultListener.class);
// 消费者
private MessageConsumer messageConsumer;
// 连接工厂
ConnectionFactory connectionFactory;
// 连接对象
private Connection conn;
// session
private Session session;
private Boolean isRunning = true;
// 连接信息
private ConnectMqtt connectMqtt;
// 监听器
private MessageListener messageListener;
// ConnectMqtt 业务参数
public ConnectionDefaultListener(ConnectionFactory connectionFactory, ConnectMqtt connectMqtt) throws Exception {
this.connectMqtt = connectMqtt;
this.connectionFactory = connectionFactory;
this.messageListener = new MyJmsListenerConsumer(connectMqtt);
createConnect();
createConnSession();
}
/**
* 创建新连接
*/
private void createConnect() throws Exception {
this.conn = connectionFactory.createConnection();
this.conn.start();
}
public void start() throws Exception {
// 注册messageConsumer
registerMessageConsumer();
// 5S执行一次
new JmsListenerThread().registerConnectionDefaultListener(this);
}
/**
* 注册MessageConsumer
*/
private void registerMessageConsumer() {
try {
// 设置监听的队列名称
Destination destination = this.session.createQueue(connectMqtt.getQueue());
// 创建一个消费者
MessageConsumer consumer = this.session.createConsumer(destination);
addMessageConsumer(consumer);
this.isRunning = Boolean.TRUE;
logger.info("activeMq create success, destination: {}", connectMqtt.getQueue());
} catch (Exception e) {
logger.error("jms监听器创建失败Exception: {}", e);
logger.error("jms监听器创建失败: {}", connectMqtt.getQueue());
}
}
/**
* 添加客户端
*
* @param consumer
*/
public void addMessageConsumer(MessageConsumer consumer) {
if (this.messageConsumer != null) {
// 手动清除对象
this.messageConsumer = null;
}
this.messageConsumer = consumer;
}
/**
* 创建session
*
* @return
* @throws JMSException
*/
public void createConnSession() throws JMSException {
Session session = this.conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.session = session;
}
/**
* 校验是否建立连接
* 没连接抛出异常
*
* @param timeout
* @return
* @throws JMSException
*/
protected Message receiveFromConsumer(long timeout) throws JMSException {
if (timeout > 0L) {
return this.messageConsumer.receive(timeout);
} else {
return timeout < 0L ? this.messageConsumer.receiveNoWait() : this.messageConsumer.receive();
}
}
/**
* 消费消息
*
* @param message
*/
public void messageInvoke(Message message) {
try {
this.messageListener.onMessage(message);
} catch (Exception e) {
logger.error("messageInvoke error {}", e);
}
}
/**
* 重连逻辑
*/
public synchronized void releaseConnection() {
if (isRunning) {
return;
}
try {
close();
// 创建连接
createConnect();
// 创建session
createConnSession();
// 创建队列
registerMessageConsumer();
this.isRunning = Boolean.TRUE;
} catch (Exception e) {
logger.error("jms releaseConnect error {}", e);
}
}
// 关闭 连接
public void close() {
try {
if (null != this.messageConsumer) {
this.messageConsumer.close();
}
if (null != this.session) {
session.close();
}
if (null != this.conn) {
this.conn.close();
}
} catch (Exception e) {
logger.error("close error:{}", e);
}
}
}
1、 ;
/**
* 自定义JMS监听
*/
public class MyJmsListenerConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MyJmsListenerConsumer.class);
private ConnectMqtt connectMqtt;
public MyJmsListenerConsumer(ConnectMqtt connectMqtt) {
this.connectMqtt = connectMqtt;
}
@Override
public void onMessage(Message message) {
try {
//接收文本消息
if (message instanceof TextMessage) {
TextMessage text = (TextMessage) message;
logger.info("ActiveMq 接收消息: {}", text.getText());
...业务逻辑
logger.info("process end");
} else {
logger.warn("消息不支持接受");
}
} catch (Exception e) {
// 报错日志;
logger.error("error: {}", e.getMessage());
}
}
版权声明:「DDKK.COM 弟弟快看,程序员编程资料站」本站文章,版权归原作者所有