1.添加一个配置文件
配置信息如下:
logging.level.com.alibaba.cloud.stream.binder.rocketmq=DEBUG
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.bindings.output1.destination=test-topic
spring.cloud.stream.bindings.output1.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output1.producer.group=binder-group
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
spring.cloud.stream.bindings.output2.destination=TransactionTopic
spring.cloud.stream.bindings.output2.content-type=application/json
spring.cloud.stream.rocketmq.bindings.output2.producer.transactional=true
spring.cloud.stream.rocketmq.bindings.output2.producer.group=myTxProducerGroup
spring.cloud.stream.bindings.output3.destination=pull-topic
spring.cloud.stream.bindings.output3.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output3.producer.group=pull-binder-group
spring.application.name=rocketmq-produce-example
server.port=28081
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
2.添加一个启动类
@SpringBootApplication
public class RocketMQProduceApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProduceApplication.class, args);
}
}
3.添加 MQSource
在 Source 里面定义输出:
public interface MQSource {
@Output("output1")
MessageChannel output1() ;
@Output("output2")
MessageChannel output2() ;
@Output("output1")
MessageChannel output3() ;
}
4.添加发送消息的类
@Service
public class SendService {
@Autowired
private MQSource source;
//发送简单的测试消息
public void send(String msg) throws Exception {
source.output1().send(MessageBuilder.withPayload(msg).build());
}
//发消息时添加标签
public <T> void sendWithTags(T msg, String tag) throws Exception {
Message message = MessageBuilder.createMessage(msg, new MessageHeaders(Stream.of(tag).collect(Collectors.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
source.output1().send(message);
}
//发送一个对象消息
public <T> void sendObject(T msg, String tag) throws Exception {
Message message = MessageBuilder.withPayload(msg).setHeader(MessageConst.PROPERTY_TAGS, tag).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build();
source.output1().send(message);
}
//发送事务的消息
public <T> void sendTransactionalMsg(T msg, int num) throws Exception {
MessageBuilder builder = MessageBuilder.withPayload(msg).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
builder.setHeader("test", String.valueOf(num));
Message message = builder.build();
source.output2().send(message);
}
public void sendMassiveMessage(String msg) {
source.output3().send(MessageBuilder.withPayload(msg).build());
}
}
5.添加配置类
代码如下:
@Configuration @
EnableBinding({
MQSource.class})
public class MQConfig {
}
6.事务消息往往需要我们监听回查
新建一个类:
代码如下:
//TransactionStatus.CommitTransaction:消息提交,当消息状态为 CommitTransaction,表示允许消费者允许消费当前消息
//TransactionStatus.RollbackTransaction:消息回滚,表示 MQ 服务端将会删除当前半消息,不允许消费者消费
//TransactionStatus.Unknown:中间状态,表示 MQ 服务需要发起回查操作,检测当前发送方本地事务的执行状态。
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup", corePoolSize = 5, maximumPoolSize = 10)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
//消息生产者需要在 executeLocalTransaction 中执行本地事务,当事务半消息提交成功,执行完毕后需要返回事务状态码
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object o){
Object num = msg.getHeaders().get("test");
if ("1".equals(num)) {
System.out.println( "executer: " + new String((byte[]) msg.getPayload()) + " unknown");
return RocketMQLocalTransactionState.UNKNOWN; // 将会导致再次查询本地事务
}else if ("2".equals(num)) {
System.out.println( "executer: " + new String((byte[]) msg.getPayload()) + " rollback");
return RocketMQLocalTransactionState.ROLLBACK; // 半消息将会被 mq 服务器删除,并且消费者不会消费到该消息
}
System.out.println( "executer: " + new String((byte[]) msg.getPayload()) + " commit");
return RocketMQLocalTransactionState.COMMIT; // 半消息提交,消费者会消费到该消息。
}
//实现 checkLocalTransaction 方法,该方法用于进行本地事务执行情况回查,并回应事务状态给 MQ 的 broker
//执行完成之后需要返回对应的事务状态码
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
System.out.println("check: " + new String((byte[]) message.getPayload()));
return RocketMQLocalTransactionState.COMMIT;
}
}
7.构建一个简单的模型
代码如下:
8.测试消息的发送
@RestController
public class SendMessageController {
@Autowired
private SendService sendService ;
//发送一个简单的消息
@GetMapping("/send/simple")
private ResponseEntity<String> sendSimpleMessage( @RequestParam(required = true) String msg) throws Exception {
sendService.send(msg);
return ResponseEntity.ok("发送成功") ;
}
//发送消息并且带上标签
@GetMapping("/send/tags")
private ResponseEntity<String> sendMessageWithTag( @RequestParam(required = true) String msg,@RequestParam(required = true)String tags) throws Exception {
sendService.sendWithTags(msg,tags);
return ResponseEntity.ok("发送成功") ;
}
//发送对象消息
@GetMapping("/send/object")
public ResponseEntity<String> sendObjectMessage(User user,String tags) throws Exception {
sendService.sendObject(user,tags);
return ResponseEntity.ok("发送成功") ;
}
//发送一个事务消息,也就是 half 消息
@GetMapping("/send/transaction")
public ResponseEntity<String> sendTransactionMessage(String msg ,int num) throws Exception {
sendService.sendTransactionalMsg(msg,num);
return ResponseEntity.ok("发送成功") ;
}
//发送很多消息
@GetMapping("/send/poll")
public ResponseEntity<String> sendMassiveMessage(String msg) throws Exception {
sendService.sendMassiveMessage(msg);
return ResponseEntity.ok("发送成功") ;
}
}
9.启动类
代码如下:
@SpringBootApplication
public class RocketMQProduceApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQProduceApplication.class, args);
}
}