异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。
注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。
@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。
26.6.1 启用监听端点的注解
要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setDestinationResolver(destinationResolver());
factory.setConcurrency("3-10");
return factory;
}
}
默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。
对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。
如果您喜欢XML配置,请使用<jms:annotation-driven>
元素。
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory"
class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="concurrency" value="3-10"/>
</bean>
26.6.2 编程式端点注册
JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。
应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。
26.6.3 注解式端点方式签名
到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
可以向JMS 监听端点中注入的主要元素包括:
- 原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。
- 可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。
- 代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。
- @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。
- @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。
- 不被支持的(如Message、Session等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。
注入Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。
例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
26.6.4 响应管理
MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。
假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。
如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}