1 消息传递域介绍
JMS规范中定义了两种消息传递域:点对点(point-topoint,简写成P2P)消息传递域和发布/订阅消息传递域(publish/subscribe,简写成pub/sub)。在点对点消息传递域中,目的地被称为队列(queue);在发布/订阅消息传递域中,目的地被称为主题(topic)
点对点消息传递域的特点如下:
1、每个消息只能有一个消费者。不是每一个队列只能有一个消费者
2、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。类似于发短信。
发布/订阅消息传递域的特点如下:
1、每个消息可以有多个消费者,可以想象成关注微博中的一个话题。的确和微博比较像,哈哈
2、生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息,订阅之前的消息是收不到的。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
2 统一处理方法
// 定义ActiveMqContext
@Data
@AllArgsConstructor
public class ActiveMqContext {
private Connection connection;
private Session session;
private Session listenerSession;
private MessageProducer producer;
private MessageConsumer consumer;
private MessageConsumer listener;
private boolean transaction;
}
// 统一发送消息
public static void sendMsg(ActiveMqContext context, String msg, int priority) throws JMSException {
Session session = context.getSession();
MessageProducer producer = context.getProducer();
TextMessage message = session.createTextMessage(msg);
message.setJMSPriority(priority);
producer.send(message);
if (context.isTransaction()) {
context.getSession().commit();
}
}
// 统一接收消息
public static void receiveMsg(ActiveMqContext context) throws JMSException {
MessageConsumer consumer = context.getConsumer();
// timeout表示等待时间。超过timeout没有消息方法就解除阻塞
TextMessage message = (TextMessage) consumer.receive();
while (message != null) {
log.info("receive msg:{}", message.getText());
message = (TextMessage) consumer.receive(60000);
if (Objects.nonNull(message)) {
// 如果没配置事务,设置手动签收后客户端收到消息需要调ack方法才能够接收成功
message.acknowledge();
if (context.isTransaction()) {
// 支持事务后commit才会真正消费
context.getSession().commit();
}
}
}
}
// 统一监听消息
public static void listenMsg(ActiveMqContext context) throws JMSException {
MessageConsumer listener = context.getListener();
Session listenerSession = context.getListenerSession();
listener.setMessageListener((message) -> {
try {
log.info("listen msg:{}", ((TextMessage) message).getText());
if (context.isTransaction()) {
listenerSession.commit();
}
} catch (JMSException e) {
e.printStackTrace();
}
});
}
3 点对点消息测试
3.1 获取点对点ActiveMqContext
public static ActiveMqContext getQueueContext(Boolean transaction, int acknowledgeMode, int deliveryMode, String queueName) throws JMSException {
Connection connection = factory.createConnection();
connection.start();
// 支持事务后send后不会发送,commit后才会发出去。签收是自动签收,配置手动签收不生效
// 设置手动签收后客户端收到消息需要调ack方法才能够接收成功
Session session = connection.createSession(transaction, acknowledgeMode);
Destination destination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(destination);
// 设置持久化后, JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递
producer.setDeliveryMode(deliveryMode);
MessageConsumer consumer = session.createConsumer(destination);
connection = factory.createConnection();
connection.start();
// 支持事务后send后不会发送,commit后才会发出去。签收是自动签收,配置手动签收不生效
// 设置手动签收后客户端收到消息需要调ack方法才能够接收成功
Session listenerSession = connection.createSession(transaction, acknowledgeMode);
MessageConsumer listener = listenerSession.createConsumer(destination);
return new ActiveMqContext(connection, session, listenerSession, producer, consumer, listener, transaction);
}
3.2 spring中注入一个点对点发送的context
@Bean
public ActiveMqContext queueContext() throws JMSException {
return ActiveMQUtil.getQueueContext(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE, DeliveryMode.PERSISTENT, queueName);
}
3.3 消息生产者
@Component
public class QueueProcedure {
@Resource(name = "queueContext")
private ActiveMqContext context;
public void sendMsg(String msg) throws JMSException {
ActiveMQUtil.sendMsg(context, msg);
}
}
3.4 消息消费者
@Component
public class QueueConsumer {
@Resource(name = "queueContext")
private ActiveMqContext context;
public void receiveMsg() throws JMSException {
ActiveMQUtil.receiveMsg(context);
}
}
3.5 消息监听者
@Component
public class QueueListener {
@Resource(name = "queueContext")
private ActiveMqContext context;
public void receiveMsg() throws JMSException {
ActiveMQUtil.listenMsg(context);
}
}
3.6 测试代码
3.6.1 发送消息
@Test
public void sendMsg() throws JMSException, InterruptedException {
for (int i = 0; i < 10; i ++) {
procedure.sendMsg("msg-" + i);
Thread.sleep(100);
}
}
输出
2022-07-13 23:43:53.106 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033092, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-0}
2022-07-13 23:43:53.199 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033194, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-1}
2022-07-13 23:43:53.313 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033294, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-2}
2022-07-13 23:43:53.399 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033395, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-3}
2022-07-13 23:43:53.515 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033496, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-4}
2022-07-13 23:43:53.600 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:6, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033596, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-5}
2022-07-13 23:43:53.715 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:7, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033697, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-6}
2022-07-13 23:43:53.826 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:8, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033798, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-7}
2022-07-13 23:43:53.905 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-60057-1594655029182-1:2:1:1:9, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594655033899, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-8}
查看mq控制台消息,有10个消息在等待消费
3.6.2 消费者消费消息
@Test
public void receiveMsg() throws JMSException {
consumer.receiveMsg();
}
输出
2022-07-13 23:47:56.409 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-0
2022-07-13 23:47:56.411 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-1
2022-07-13 23:47:56.412 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-2
2022-07-13 23:47:56.412 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-3
2022-07-13 23:47:56.413 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-4
2022-07-13 23:47:56.413 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-5
2022-07-13 23:47:56.414 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-6
2022-07-13 23:47:56.414 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-7
2022-07-13 23:47:56.415 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-8
2022-07-13 23:47:56.415 - [main] INFO com.jms.activemq.ActiveMQUtil : 126 - receive msg:msg-9
查看kq控制台消息,这10个消息已经被消费
3.6.3 监听者监听消息
@Test
public void listenerMsg() throws JMSException {
queueListener.receiveMsg();
}
再做一次消息发送,然后监听消息即刻,效果同消费消息。
3.6.4 同时消费和监听消息
@Test
public void sendAndReceiveMsg() throws JMSException, InterruptedException {
new Thread(() -> {
try {
receiveMsg();
} catch (JMSException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
listenerMsg();
} catch (JMSException e) {
e.printStackTrace();
}
}).start();
sendMsg();
}
输出
2022-07-15 22:29:11.333 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351307, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-0}
2022-07-15 22:29:11.412 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 168 - listen msg:msg-1
2022-07-15 22:29:11.415 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351409, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-1}
2022-07-15 22:29:11.515 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 150 - receive msg:msg-2
2022-07-15 22:29:11.515 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351512, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-2}
2022-07-15 22:29:11.620 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 168 - listen msg:msg-3
2022-07-15 22:29:11.620 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351614, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-3}
2022-07-15 22:29:11.721 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 150 - receive msg:msg-4
2022-07-15 22:29:11.721 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351715, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-4}
2022-07-15 22:29:11.820 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 168 - listen msg:msg-5
2022-07-15 22:29:11.820 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:6, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351816, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-5}
2022-07-15 22:29:11.920 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 150 - receive msg:msg-6
2022-07-15 22:29:11.920 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:7, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823351916, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-6}
2022-07-15 22:29:12.020 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 168 - listen msg:msg-7
2022-07-15 22:29:12.036 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:8, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823352020, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-7}
2022-07-15 22:29:12.125 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 150 - receive msg:msg-8
2022-07-15 22:29:12.125 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:9, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823352122, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-8}
2022-07-15 22:29:12.190 - [DefaultMessageListenerContainer-2] ERROR o.s.jms.listener.DefaultMessageListenerContainer : 962 - Could not refresh JMS Connection for destination 'springboot-topic' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect
2022-07-15 22:29:12.225 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 168 - listen msg:msg-9
2022-07-15 22:29:12.231 - [ActiveMQ NIO Worker 8] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64442-1594823347898-1:3:1:1:10, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://demo-queue, transactionId = null, expiration = 0, timestamp = 1594823352225, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-9}
可以看到,消息是被消费者喝监听者共享的
4 发布订阅消息测试
4.1 获取发布/订阅ActiveMqContext
public static ActiveMqContext getTopicContext(int acknowledgeMode, int deliveryMode, String queueName) throws JMSException {
Connection connection = factory.createConnection();
if (deliveryMode == DeliveryMode.NON_PERSISTENT) {
connection.start();
}else {
// 如果是持久topic,则需要设置clientId
connection.setClientID("consumer" + (index ++));
}
// 支持事务后send后不会发送,commit后才会发出去。签收是自动签收,配置手动签收不生效
// 设置手动签收后客户端收到消息需要调ack方法才能够接收成功
Session session = connection.createSession(Boolean.FALSE, acknowledgeMode);
Topic destination = session.createTopic(queueName);
MessageProducer producer = session.createProducer(destination);
// 设置持久化后, JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递
producer.setDeliveryMode(deliveryMode);
// 如果是持久化topic,创建TopicSubscriber进行订阅
MessageConsumer consumer = deliveryMode == DeliveryMode.NON_PERSISTENT?
session.createConsumer(destination): session.createDurableSubscriber(destination, queueName);
if (deliveryMode == DeliveryMode.PERSISTENT) {
// 如果是持久topic,则创建完消费者之后再start
connection.start();
}
return new ActiveMqContext(connection, session, producer, consumer, Boolean.FALSE);
}
4.2 spring中注入一个发布/订阅A的context(非持久化)
@Bean
public ActiveMqContext topicContext() throws JMSException {
return ActiveMQUtil.getTopicContext(Session.AUTO_ACKNOWLEDGE, DeliveryMode.NON_PERSISTENT, topicName);
}
4.3 消息生产者
@Component
public class TopicProcedure {
@Resource(name = "topicContext")
private ActiveMqContext context;
public void sendMsg(String msg) throws JMSException {
ActiveMQUtil.sendMsg(context, msg);
}
}
4.4 消息消费者
@Component
public class TopicConsumer {
@Resource(name = "topicContext")
private ActiveMqContext context;
public void receiveMsg() throws JMSException {
ActiveMQUtil.receiveMsg(context);
}
}
4.5 消息监听者
@Component
public class TopicListener {
@Resource(name = "topicContext")
private ActiveMqContext context;
public void receiveMsg() throws JMSException {
ActiveMQUtil.listenMsg(context);
}
}
4.6 测试代码
4.6.1 发送消息
@Test
public void sendMsg() throws JMSException, InterruptedException {
for (int i = 0; i < 10; i ++) {
procedure.sendMsg("msg-" + i);
Thread.sleep(100);
}
}
输出
2022-07-15 21:11:48.959 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818708955, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-0}
2022-07-15 21:11:49.062 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709058, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-1}
2022-07-15 21:11:49.166 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709160, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-2}
2022-07-15 21:11:49.263 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709262, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-3}
2022-07-15 21:11:49.366 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709362, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-4}
2022-07-15 21:11:49.466 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:6, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709462, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-5}
2022-07-15 21:11:49.569 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:7, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709562, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-6}
2022-07-15 21:11:49.665 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:8, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709664, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-7}
2022-07-15 21:11:49.768 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:9, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709764, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-8}
2022-07-15 21:11:49.867 - [ActiveMQ NIO Worker 9] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-63053-1594818704703-1:4:1:1:10, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594818709865, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-9}
4.6.2 消费者消费消息
@Test
public void receiveMsg() throws JMSException {
consumer.receiveMsg();
}
4.6.3 监听者监听消息
@Test
public void listenerMsg() throws JMSException {
listener.receiveMsg();
}
这个时候发现不管是启动消费者还是启动监听者都获取不到消息。原因是发布/订阅消息一定要先启动消费者再发送消息,否则消费不到之前的数据。
这里我们先启动消费者,再启动生产者会看到消费数据
4.6.4 获取消息正确姿势
@Test
public void sendMsg() throws JMSException, InterruptedException {
new Thread(() -> {
try {
receiveMsg();
} catch (JMSException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
listenerMsg();
} catch (JMSException e) {
e.printStackTrace();
}
}).start();
for (int i = 0; i < 10; i ++) {
procedure.sendMsg("msg-" + i);
Thread.sleep(100);
}
synchronized (this) {
this.wait();
}
}
输出
2022-07-15 22:11:26.283 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-0
2022-07-15 22:11:26.280 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286276, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-0}
2022-07-15 22:11:26.286 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-0
2022-07-15 22:11:26.380 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286377, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-1}
2022-07-15 22:11:26.383 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-1
2022-07-15 22:11:26.383 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-1
2022-07-15 22:11:26.481 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:3, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286477, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-2}
2022-07-15 22:11:26.487 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-2
2022-07-15 22:11:26.490 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-2
2022-07-15 22:11:26.584 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:4, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286578, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-3}
2022-07-15 22:11:26.588 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-3
2022-07-15 22:11:26.591 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-3
2022-07-15 22:11:26.679 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:5, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286679, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-4}
2022-07-15 22:11:26.682 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-4
2022-07-15 22:11:26.682 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-4
2022-07-15 22:11:26.779 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:6, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286779, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-5}
2022-07-15 22:11:26.782 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-5
2022-07-15 22:11:26.782 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-5
2022-07-15 22:11:26.879 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:7, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286879, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-6}
2022-07-15 22:11:26.879 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-6
2022-07-15 22:11:26.883 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-6
2022-07-15 22:11:26.982 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:8, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822286982, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-7}
2022-07-15 22:11:26.985 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-7
2022-07-15 22:11:26.985 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-7
2022-07-15 22:11:27.086 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:9, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822287083, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-8}
2022-07-15 22:11:27.089 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-8
2022-07-15 22:11:27.089 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-8
2022-07-15 22:11:27.187 - [ActiveMQ NIO Worker 7] INFO org.apache.activemq.jms.pool.PooledProducer : 106 - send mq msg success, msg:ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = ID:LAPTOP-7KJFACUD-64079-1594822282964-1:4:1:1:10, originalDestination = null, originalTransactionId = null, producerId = null, destination = topic://demo-topic, transactionId = null, expiration = 0, timestamp = 1594822287184, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = msg-9}
2022-07-15 22:11:27.191 - [Thread-5] INFO com.jms.activemq.ActiveMQUtil : 144 - receive msg:msg-9
2022-07-15 22:11:27.191 - [ActiveMQ Session Task-1] INFO com.jms.activemq.ActiveMQUtil : 162 - listen msg:msg-9
可以看到消费者、监听者都能获取到完整的消息
5 小结
1点对点发送消息的时候消费者不必先启动,同一个队列的消费者共享生产者的数据
2发布/订阅消息的时候消费者要先启动(非持久订阅),同一个队列的消费者都能拿到完整的消息