发送批量消息

  • 批量发送下消息

  • 优点:能提高性能

  • 缺点

    • 一批消息只能有相同的topic,相同的waitStroeMsgOK
    • 不能是延时消息
    • 一批消息的总大小不能超过4MB
public class BatchProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        //指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动
        producer.start();
        List<Message> msgs = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            //创建消息,指定topic,tag和消息体
            Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msgs.add(msg);
        }
        //批量发送
        SendResult result = producer.send(msgs);
        System.out.println(result);
        //关闭
        producer.shutdown();

    }
}
  • 针对总长度不能超过4MB,可以调用以下这个工具类

public class Listsplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;

   public Listsplitter(List<Message> messages) {
       this.messages = messages;
   }

   @Override
   public boolean hasNext() {
       return currIndex < messages.size();
   }

   @Override
   public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (;nextIndex < messages.size();nextIndex ++){
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length()+message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
             tmpSize += entry.getKey().length()+entry.getValue().length();
           }
           tmpSize = tmpSize + 20;//增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT){
               //单条消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0){
                 //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                   nextIndex ++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT){
               break;
           }else{
               totalSize += tmpSize;
           }
       }
       List<Message> subList = messages.subList(currIndex,nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
  • 生产者就变成了这样

public class BatchProducer {
   public static void main(String[] args) throws Exception {
       //初始化生产者
       DefaultMQProducer producer = new DefaultMQProducer("producer_group");
       //指定nameServer地址
       producer.setNamesrvAddr("localhost:9876");
       //启动
       producer.start();
       List<Message> msgs = new ArrayList<>();
       for (int i = 0; i < 100; i++) {
           //创建消息,指定topic,tag和消息体
           Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
           msgs.add(msg);
       }
       Listsplitter listsplitter = new Listsplitter(msgs);
       while (listsplitter.hasNext()){
           List<Message> next = listsplitter.next();
           //批量发送
           SendResult result = producer.send(next);
           System.out.println(result);
       }
       //关闭
       producer.shutdown();

   }
}