指定分区和偏移量,时间消费
指定分区消费
从topic的0号分区最新offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
从头消费
从topic的0号分区的开始offset消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
指定offset消费
从topic的0号分区的10这个offset开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
指定时间消费
根据时间,去所有的partition中确定该时间对应的offset,然后去所有的partition中找到该offset之后的消息开始消费。
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
TopicPartition key = entry.getKey();
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() + "|offset-" + offset);
System.out.println();
//根据消费⾥的timestamp确定offset
if (value != null) {
consumer.assign(Arrays.asList(key));
consumer.seek(key, offset);
}
}