1、整合 ROCKETMQ 实现消息生产消费(ROCKETMQTEMPLATE实现)
消息队列能解决什么问题:异步处理、应用解耦、流量削锋、日志处理、消息通讯
在整套的微服务架构中, 消息队列是不可或缺的部分, 它能够起到线程内同步或者异步调用无法达到的作用。
优点:
1、 解耦;
i.只依赖消息的格式, 而不依赖发送者的ip和端口
ii.多消费者的情况下, 发送者不需要关注消费者的任何信息
2、 路由;
不能互相访问的网络之间可以消息队列实现访问, 可以减少对现有网络的修改。
消息可靠性
当消费者发生故障时, 消息可以被有效保存下来, 等待恢复后继续访问.
3、 异步调用;
发送者异步发送消息, 不等待消息ack,不会对发送者本身产生响应速度的影响, 当然异步调用也是可以实现的。
4、 方便扩展;
集群部署消息队列, 当流量增大和减小是可以通过调整部署来实现和发送方, 消费方无关。
缺点:多出一个环节,需要保证消息队列的可用性。
1、MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,ActiveMQ只是Apache下一个队列项目,不仅仅支持MQTT协议,也支持其他比如AMQP等协议。MQTT是协议,协议只是定义好的规则。ActiveMQ只是实现了MQTT协议的一个程序。
如果说传统的消息队列中间件一般应用于微服务之间,那么适用于物联网的微消息队列 MQTT 版则实现了端与云之间的消息传递和真正意义上的万物互联。
百度百科 AMQP https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
2、ActiveMQ
ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。
5种消息类型
TextMessage:java.lang.String对象,如xml文件内容。
MapMessage:key/value键值对的集合,key是String对象,值类型可以是Java任何基本类型。
BytesMessage:字节流。
StreamMessage:Java 中的输入输出流。
ObjectMessage:Java中的可序列化对象。
3、SpringBoot实现JmsTemplate处理ActiveMQ消息
@EnableJms //启动消息队列
@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)
@SpringBootApplication
public class ActiveApplication {
public static void main(String[] args) {
SpringApplication.run(ActiveApplication.class, args);
}
}
BeanConfig.java 定义存放消息的队列。
@Configuration
public class BeanConfig {
//定义存放消息的队列
@Bean(name = "deviceMSG")
public Queue deviceMSG(){
return new ActiveMQQueue("deviceMSG?consumer.prefetchSize=1");
}
}
ProviderService 生产者,创建Controller测试
@Component
public class ProviderService {
@Autowired private JmsMessagingTemplate jmsMessagingTemplate;
@Resource(name = "deviceMSG")
@Autowired private Queue queue;
public void sendMessage(){
Map<String, byte[]> msgMap = new HashMap<>();
byte[] ctx = new byte[]{49, 54, 50, 47, 51, 48, 54, 51, 52, 50};
String devId = "2201000001";
msgMap.put("devId", devId.getBytes());
msgMap.put("ctx", ctx);
jmsMessagingTemplate.convertAndSend(queue ,msgMap);
}
}
创建消费者 ConsumerService.java
@Component
public class ConsumerService {
@Autowired private MainService mainService;
@JmsListener(destination = "deviceMSG", concurrency = "10-20")
public void handlerMessage(Map<String, byte[]> msgMap)
{
mainService.anylysisDataPackage(msgMap);
}
}
创建Controller测试
@RestController
public class IndexController {
@Autowired ProviderService providerService;
@RequestMapping("/sendMessage")
public void sendMessage(){
logger.info("Provider send msg success");
providerService.sendMessage();
}
}