kafka Producer API
1.消息发送流程
kafka的producer发送消息采用的是异步发送的方式。在消息的发送过程中,涉及到了两个线程–main线程和sender线程,以及一个线程共享变量–RecordAccumulator。main线程将消息发送给RecordAccumulator,sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。
相关参数:
batch.size
:只有数据积累到batch.size之后,sender才会发送数据
linger.ms
:如果数据迟迟未达到batch.size,sender等待linger.ms之后就会发送数据
2.异步发送API
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
服务类
KafkaProducer
:创建一个生产者对象,用来发送数据
ProducerConfig
:获取kafka需要的一系列配置参数
ProducerRecord
:每条数据都要封装成一个ProducerRecord对象
2.1 不调用回到函数#
public class MyProducer {
public static void main(String[] args) throws InterruptedException {
//1.创建kafka生产者的配置信息
Properties properties = new Properties();
//2.指定连接的kafka集群
properties.put("bootstrap.servers", "hll1:9092");
//3.ack机制
properties.put("acks", "all");
//4.重试次数
properties.put("retries", 3);
//5.批次大小,16384=16k
properties.put("batch.size", 16384);
//6.等待时间,时间到了之后会发送数据
properties.put("linger.ms", 1);
//7.RecordAccumulator缓冲区大小,33554432=32M
properties.put("buffer.memory", 33554432);
//8.key value的序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//9.创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//10.发送数据
//topic信息写在ProducerRecord
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i));
}
//11.关闭连接,如果不关闭连接,消费者不会接收到消息
producer.close();
//或者可以暂停线程,达到"linger.ms"的配置要求也可以完成消费
//Thread.sleep(1000);
}
}
启动一个消费者后再运行代码,可以看到成功消费通过代码生成的消息
2.2 调用回调函数#
public class CallbackProducer {
public static void main(String[] args) {
//创建kafka配置信息
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//创建kafka生产者对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
//发送数据
for (int i = 0; i < 10; i++) {
kafkaProducer.send(new ProducerRecord<>("bigdata", "hll::" + i),
(metadata, exception) -> {
if (exception == null) {
System.out.println(metadata.partition() + "::" + metadata.offset());
}
});
}
//关闭资源
kafkaProducer.close();
}
}
从打印的结果来看,消息被平均的分配到了两个分区(当前测试的主题只有两个分区)
3.自定义分区
public class MyPartitioner implements Partitioner {
/**
* 分区选取
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//默认分区获取方法
//new DefaultPartitioner().partition()
//可以实现自己分区策略,返回的需要是可用的分区的
return 0;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
public class PartitionerProducer {
public static void main(String[] args) {
//kafka配置文件
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hll1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
//自定义分区加载器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.hll.partitioner.MyPartitioner");
//创建kafka生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i = 0; i < 10; i++) {
producer.send(new ProducerRecord<>("bigdata", "hll::" + i),
(metadata, exception) -> {
//如果exception为null,说明消息发送成功没有异常
if (exception == null) {
System.out.println(metadata.partition() + "==" + metadata.offset());
} else {
exception.printStackTrace();
}
});
}
//关闭连接
producer.close();
}
}
4.同步发送API
同步发送的意思是,一条消息发送之后,会阻塞当前线程,直到返回ack.
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需要调用Future对象的get()方法即可。
//调用get,阻塞线程,同步发送
producer.send(new ProducerRecord<String, String>("bigdata", "hll---" + i)).get();