延时任务
延时生产者

- 延时任务无法指定任意时间延迟,只能设置几个固定的延时等级,从1s到2h分别是1到18
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message msg = new Message("topicList", "tag", ("rocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(4);
SendResult result = producer.send(msg);
System.out.println(result);
}
producer.shutdown();
}
}
延时消费者
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("groupName");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicName","*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
System.out.println(System.currentTimeMillis()-msg.getStoreTimestamp());
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer start ");
}
}