过滤消息

大部分我们在同一个topic下面,指定消息的tag就可以划分不同的消息

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(“groupName”);
//订阅topic,第二个参数指定的tag
consumer.subscribe(“TopicName”,“TAGA || TAGB || TAGC”);

  • 这样子就能接受包含TAGA,TAGB,TAGC的消息.但是限制是一个消息只能有一个tag

sql基本语法

  • rocketMQ能用一些简单的sql语法来支持过滤消息.

  • 数值比较,比如 >,>=,<=,BETWEEN,=

  • 字符比较,比如: =,<,>

  • IS NULL或者IS NOT NULL

  • 逻辑符号AND ,OR,NOT

  • 常量支持类型为

  • 数值,比如 123 ,14

  • 字符,比如 ‘acv’,要用单引号包裹起来

  • NULL,特殊的常量

  • 布尔值,TRUE或FALSE

  • 只能使用push模式的消费者才能使用SQL92标准的sql语句

  • 生产者


public class DefaultProducer {
    public static void main(String[] args) throws Exception {
        //初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        //指定nameServer地址
        producer.setNamesrvAddr("localhost:9876");
        //启动
        producer.start();
        for (int i = 0; i < 100; i++) {
            //创建消息,指定topic,tag和消息体
            Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            //设置属性,可以在消费端进行筛选
            msg.putUserProperty("a",String.valueOf(i));
            //发送并有result返回,可根据result判断发送是否成功
            SendResult result = producer.send(msg);
            System.out.println(result);
        }
        //关闭
        producer.shutdown();

    }
}
  • 消费者

public class DefaultConsumer {
  public static void main(String[] args) throws Exception {
      //实例化消费者,指定组名
      DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupName");
      //指定NameServer地址
      consumer.setNamesrvAddr("localhost:9876");
      //订阅topic,第二个参数过滤消息的一些属性
      consumer.subscribe("TopicName",MessageSelector.bySql("i > 5"));

      //注册回调函数,处理消息
      consumer.registerMessageListener(new MessageListenerConcurrently() {
          @Override
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
              System.out.println(list);
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      });
      //启动消费者
      consumer.start();
      System.out.println("Consumer start ");
  }
}