26.6 注解驱动的监听端点

异步接收消息的最简单的方法是使用注解监听端点的基础架构。简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点。

@Component 
public class MyService { 

    @JmsListener(destination = "myDestination") 
    public void processOrder(String data) { ... } 
} 

上述示例的想法是,每当javax.jms.Destination “myDestination” 上有消息可用时,就调用相应地processOrder方法(在这种情况下,JMS消息的内容类似于MessageListenerAdapter提供的内容)。

注解端点的基础架构使用JmsListenerContainerFactory为每个注解方法创建一个消息监听容器。这样的容器没有针对应用上下文进行注册,但是可以使用JmsListenerEndpointRegistry bean 进行简单的管理。

@JmsListener是 Java 8上的可重复注解,因此可以通过向其添加额外的@JmsListener声明将多个JMS目的地关联到同一个方法。 在 Java 6和7上,你可以使用@JmsListeners注解。

26.6.1 启用监听端点的注解

要启用对@JmsListener注解的支持,请将@EnableJms添加到你的一个@Configuration类上。

@Configuration 
@EnableJms 
public class AppConfig { 

    @Bean 
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { 
        DefaultJmsListenerContainerFactory factory = 
                new DefaultJmsListenerContainerFactory(); 
        factory.setConnectionFactory(connectionFactory()); 
        factory.setDestinationResolver(destinationResolver()); 
        factory.setConcurrency("3-10"); 
        return factory; 
    } 
} 

默认情况下,基础架构将查找名为jmsListenerContainerFactory的 bean 作为用于创建消息监听容器的工厂源。 在这种情况下,忽略 JMS 基础架构的设置,processOrder方法可以在一个线程池中被调用,线程池核心数为3,最大线程数为10。

对于使用的每个注解都可以自定义监听容器工厂,或通过实现JmsListenerConfigurer接口来显示的配置默认值。仅在存在没有指定容器工厂的端点时,默认值才是必须的。有关详细信息和示例,请参阅 javadoc。

如果您喜欢XML配置,请使用<jms:annotation-driven>元素。

<jms:annotation-driven/> 

   <bean id="jmsListenerContainerFactory" 
           class="org.springframework.jms.config.DefaultJmsListenerContainerFactory"> 
       <property name="connectionFactory" ref="connectionFactory"/> 
       <property name="destinationResolver" ref="destinationResolver"/> 
       <property name="concurrency" value="3-10"/> 
   </bean> 

26.6.2 编程式端点注册

JmsListenerEndpoint提供了 JMS 端点的模型,并负责为该模型配置容器。除了使用注解之外,基础架构也允许你用编程的方式来配置端点。

@Configuration 
@EnableJms 
public class AppConfig implements JmsListenerConfigurer { 

    @Override 
    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { 
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint(); 
        endpoint.setId("myJmsEndpoint"); 
        endpoint.setDestination("anotherQueue"); 
        endpoint.setMessageListener(message -> { 
            // processing 
        }); 
        registrar.registerEndpoint(endpoint); 
    } 
} 

本例中我们使用SimpleJmsListenerEndpoint来提供MessageListener,你也可以建立自己的端点变体并自定义调用机制。

应该注意的是,你完全可以不使用@JmsListener,而仅通过JmsListenerConfigurer来注册所有端点。

26.6.3 注解式端点方式签名

到目前为止,我们已经在我们的端点注入了一个简单的String,但实际上它可以有一个非常灵活的方法签名。现在让我们重写它并注入一个带有自定义头部的Order:

@Component 
public class MyService { 

       @JmsListener(destination = "myDestination") 
       public void processOrder(Order order, @Header("order_type") String orderType) { 
           ... 
       } 
} 

可以向JMS 监听端点中注入的主要元素包括:

  • 原始的javax.jms.Message或任意子类(当然,它与传入的消息类型相匹配)。
  • 可选的javax.jms.Session来操作 JMS 原生 API,来发送自定义回复。
  • 代表着接收消息的org.springframework.messaging.Message。注意此消息同时包含自定义和JmsHeaders定义的标准头。
  • @Header注解的方法参数被用来提取一个特定的头部值,包括标准 JMS 头。
  • @Headers注解参数必须指定给一个java.util.Map,用来获取所有头。
  • 不被支持的(如Message、Session等)、且无注解的元素被视为有效载荷。可以明确的给它们添加@Payload注解。也可以通过添加@Valid注解来开启校验。

注入Spring 的Message抽象可以获取特定消息的所有信息,而无需依赖特定传输 API。

@JmsListener(destination = "myDestination") 
public void processOrder(Message<Order> order) { ... } 

可以自己扩展DefaultMessageHandlerMethodFactory来处理额外的方法参数。同时你也可以自定义转换和校验规则。

例如,如果我们需要在处理之前确保我们的Order有效,我们可以使用@Valid对有效负载进行注解,并配置必要的验证器,如下所示:

@Configuration 
@EnableJms 
public class AppConfig implements JmsListenerConfigurer { 

       @Override 
       public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { 
           registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory()); 
       } 

       @Bean 
       public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { 
           DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); 
           factory.setValidator(myValidator()); 
           return factory; 
       } 
} 

26.6.4 响应管理

MessageListenerAdapter允许你的方法有非空返回值。此时返回值将被封装在一个 javax.jms.Message中,发往原始消息的JMSReplyTo头中定义的目的地或者监听自己默认的目的地。监听默认的目的地可以通过@SendTo注解来设置。

假定现在我们的processOrder方法将返回一个OrderStatus,下面将展示如何自动地发送响应:

@JmsListener(destination = "myDestination") 
@SendTo("status") 
public OrderStatus processOrder(Order order) { 
       // order processing 
       return status; 
} 

如果你有多个@JmsListener注解方法,您还可以将@SendTo注解放在 class 上以共享默认响应目的地。

如果您需要以独立传输的方式设置额外的头信息,则可以返回一个Message,如下所示:

@JmsListener(destination = "myDestination") 
@SendTo("status") 
public Message<OrderStatus> processOrder(Order order) { 
       // order processing 
       return MessageBuilder 
               .withPayload(status) 
               .setHeader("code", 1234) 
               .build(); 
} 

如果响应的目的地是运行时实时计算的,可以将响应结果封装在一个JmsResponse中,直接指定一个目的地。前面的例子可以重写如下:

@JmsListener(destination = "myDestination") 
public JmsResponse<Message<OrderStatus>> processOrder(Order order) { 
       // order processing 
       Message<OrderStatus> response = MessageBuilder 
               .withPayload(status) 
               .setHeader("code", 1234) 
               .build(); 
    return JmsResponse.forQueue(response, "status"); 
}