1.KafKaServer管理

1.启动KafKa单个节点

kafka-server-start.sh

2. 启动KafKa集群

自定义脚本启动kafka集群;

遍历brokers指定的代理列表取出每个节点,通过SSH方式登录该节点,执行kafka-server-start.sh脚本,启动Kafka。前提,需要确保安装配置了SSH;

对自定义脚本赋予执行权限。因为Kafka运行在JVM之上,因此会依赖相应的系统环境配置,为了保证各环境配置在执行该脚本的时候已经生效,在启动命令的时候可以加入,source/etc/profile命令;

3.关闭KafKa单个节点

kafka-server-stop.sh脚本

4.关闭KafKa集群

自定义脚本,通过遍历调用kafka-server-stop.sh脚本实现

2.主题管理

Kafka提供了一个kafka-topic.sh工具脚本用于对主题相关的操作,如创建主题、删除主题、修改主题分区数和副本分配以及修改主题级别的配置信息,查看主题信息等操作;

1.创建主题

Kafka提供以下两种方式创建主题:

1、 方式一:如果代理设置了auto.create.topics.enable=true,则当生产者向一个还未创建主题发送消息的时候,会自动创建一个拥有${num.partitions}个分区和${default.replication.factor}个副本的主题;
2、 方式二:客户端通过执行kafka-topics.sh脚本创建一个主题;

2.删除主题

1、 方式一:手动删除各节点${log.dir}目录下该主题分区文件夹,同时登录ZooKeeper客户端删除主题对应的节点,主题元素据保存在/brokers/topics和/config/topics目录下;
2、 方式二:执行kafka-topics.sh脚本进行删除,如果希望通过该脚本彻底删除主题,则需要保证在启动Kafka的时候所加载的server.properties文件中配置delete.topic.enable=true,该配置默认为false,否则执行该脚本并不是真正删除主题,只是在ZooKeeper的/admin/delete_topics目录下创建一个与待删除主题同名的节点,将该主题标记为删除状态;

3.查看主题

Kafka提供了list和describe两个命令查看主题信息,其中list参数列出Kafka所有的主题名,describe参数可以查看所有的主题或者某个特定主题的信息;

1、 查看所有主题;

kafka-topic.sh --list --ZooKeeper server-1:2181,server-2:2181

1、 查看某个特定主题信息;

kafka-topic.sh --describe --ZooKeeper server-1:2181,server-2:2181

可以通过指定topic参数查看特定的主题的信息;

1、 查看正在同步的主题;

通过describe与under-replicated-partitions命令组合使用

1、 查看没有Leader的分区;

通过describe与unavailable-partitions命令组合使用

1、 查看主题覆盖的配置;

通过describe与topic-with-overrides命令组合使用

4.修改主题

当创建一个主题之后,可以通过alter命令对主题进行修改,包括修改主题级别的配置、增加主题分区、修改副本分配方案、修改主题Offset等

1、 修改主题级别配置;

在创建主题的时候,可以通过config参数覆盖主题级别的默认配置,当主题创建之后可以通过alter与config参数组合使用,修改或者增加新的配置以覆盖相应配置原来的值,或者通过alter与delete-config删除相关配置使其恢复默认值。

Kafka提供了一个kafka-configs.sh的脚本,专门用来对配置进行操作;

2、 增加分区;

Kafka并不支持减少分区的操作,只能为一个主题增加分区;

3.生产者基本操作

Kafka自带了一个在终端演示生产者发布消息的脚本kafka-console-producer.sh;

1.启动生产者

Kafka自带了一个kafka-console-producer.sh,通过执行该脚本可以在终端调用Kafka生产者向Kafka发送消息。该脚本需要指定broker-list参数:用于指定Kafka的代理地址列表,topic参数:指定消息被发送的目标主题;

2.创建主题

如果开启了自动创建主题的配置项,auto.create.topics.enable=true,当生产者向一个还不存在的主题发送消息的时候,Kafka会自动的创建该主题;

3.查看消息

Kafka生产的消息是以二进制的形式存放在文件中,为了便于查看内容,Kafka提供了一个查看日志文件的工具类DumpLogSegments,通过kafka-run-class.sh,可以直接在终端运行该工具,files是必传的参数,用于指定要转储文件的路径,可以同时指定多个文件;

4.生产者性能测试工具

Kafka提供了一个用来测试生产者性能的工具脚本kafka-producer-perf-test.sh,通过该工具可以对生产者性能进行调优,通过优化不同的配置来提升生产者的效率,从而得到一组最优的参数配置,提高吞吐量;

4.消费者基本操作

1.消费信息

Kafka采用消费组的模式,同一个消费组下的各消费者在消费消息的时候是互斥的,也就是说,对于一条消息而言,就同一个消费组下的消费者来讲,只能被同组下的一个消费者消费,但不同的消费组的消费者能够消费同一条消息。如此,我们可以通过消费组来实现消息的单播和广播;

1、 旧版高级消费者;

在kafka-console-consumer.sh脚本通过是否指定zookeeper参数区分是旧版高级消费者还是新版消费者,因为新版消费者已经不依赖于ZooKeeper;

1、 旧版低级消费者;

kafka-simple-consumer-shell.sh

2、 新版消费者;

kafka-console-consumer.sh

【4】消费多主体

kafka-console.consumer.sh的topic参数不能指定多主体,可以通过whitelist同时指定多个主题;

2.单播与多播

Kafka引入了消费组,每个消费者都是与一个特定的消费组,通过消费组就可以实现消息的单播和多播;

1、 单播;

一条消息只能被某个消费者消费;实现方式:让这些消费者属于同一个消费组即可;

2、 多播:一条消息能够被多个消费者消费的模式;

实现多播,保证这些消费者属于不同的消费组即可;

3.查看消费偏移量

Kafka提供了一个查看某个消费组消费偏移量的kafka-consumer-offset-checker.sh脚本。通过该脚本可以查看某个消费组消费信息的情况;

在0.9版本之后建议使用kafka-consumer-groups.sh

4.消费者性能测试工具

kafka-consumer-perf-test.sh

5.配置管理

Kafka提供了kafka-configs.sh脚本用于对配置进行管理操作,支持修改配置(--alter)和查看配置(--describe)两个基本操作;

1.主题级别配置

kafka-configs.sh ---entity-type topics表示操作的配置类型为主题级别,--entity-name指定待操作的主题名

2.代理级别设置

在分区迁移的时候通过临时修改如下两个配置,通过对复制流量的合理的控制,可以实现数据之间的平滑迁移;

follower.replication.throttled.rate:设置Follower复制的速率,单位为B/S

leader.replication.throttled.rate:设置Leader节点传输速率,单位为B/S

3.客户端/用户级别配置

当前版本的Kafka在客户端级别以及用户级别支持配置生产者每秒最多写入消息的字节数(producer_byte_rate)和消费者每秒拉取的字节数(consumer_byte_rate),我们简称为流控设置。

6.分区操作

1.分区Leader平衡

Kafka提供了两种方法重新选择优先副本作为分区Leader的方法,使集群负载重新达到平衡。

1、 自动平衡:;

auto.leader.rebalance.enable=true

2、 手动平衡:;

Kafka提供了一个对分区Leader进行重新平衡的工具脚本kafka-preferred-replica-election.sh,通过该工具将优先副本选举为Leader,从而重新让集群分区达到平衡;

2.分区迁移

kafka-reassign-partitions.sh脚本的用法,高脚本在集群扩容,节点下线等场景时对分区迁移操作,从而使得集群负载达到均衡;

当下线一个节点的时候,需要将该节点上的分区副本迁移到其他可用的节点上,Kafka并不会自动的进行分区副本迁移,如果不进行手动的重新发分配,就会导致某些主题数据丢失和不可用的情况;

当新增加节点的时候,也只有新创建的主题才会分配到新的节点上,而之前的主题的分区并不会自动分配到新加入的节点上,因为在主题创建的时候,该主题的AR列表中并没有新加入的节点;

为了解决这些问题,就需要对分区副本重新进行合理的分配;

3.增加分区

Kafka自带的kafka-topics.sh脚本可以很方便的对某个主题的分区数进行修改;

4.增加副本

增加副本的操作是分区迁移的一个特例,本质上也是分区副本的重分配操作;

7.连接器基本操作

Kafka自带了对连接器应用的脚本,用于将数据从外部系统导入到Kafka或者从Kafka中导出到外部系统。

Kafka连接器有两种模式:

1、 独立模式(standalone):;

connect-standalone.sh

2、 分布式模式(distributed):;

connect-distributed.sh

1.独立模式

Kafka自带脚本connect-standlone.sh用以独立模式启动Kafka连接器。

执行该脚本需要指定两个配置文件,一个是worker运行时候相关配置的配置文件,称为WorkConfig,在该文件中指定与Kafka建立连接的配置bootstrap.servers,数据格式转换类key.converter/value.converter、保存偏移量的文件路径(offset.storage.file.filename)、提交偏移量的频率(offset.flush.interval.ms)等

另外需要指定是导入数据连接器或者是导出数据连接器

【将数据从外部导入到Kafka相应主题的数据连接器:Source连接器】

Kafka自带的connect-file-source.properties文件配置了一个读取文件的Source连接器

【将数据从Kafka导出到系统外部的数据连接器:Sink连接器】

Kafka自带脚本connect-console-sink.properties配置了有一个将Kafka中的数据导出到文件的Sink连接器。

2.REST风格API应用

Kafka提供了一套基于REST风格API接口来管理连接器,默认端口为8083,也可以在启动Kafka连接器前在WorkConfig配置文件中通过rest.port配置端口

3.分布式模式

Kafka自带的connect-distributed.sh脚本用以分布式模式运行连接器,执行该脚本需要指定一个WorkConfig类型的配置文件,但是以分布式模式启动连接器并不支持在启动的时候通过加载连接器配置文件创建一个连接器,而只能通过访问REST风格接口创建连接器;

8. KafKa Manager应用

Kafka Manager提供了对Kafka集群进行管理和监控的可视化Web界面,通过Web界面能够方便的对主题进行管理,包括创建主题、删除主题、查询集群的主题、增加分区、分区副本重分配、选择优先副本为Leader,修改主题级别配置。同时还可以监控分区的AR和ISR等信息,代理以及消费者运行情况等

9. KafKa 安全机制

0、 9版本之后,Kafka增加了身份认证和权限控制两种安全机制;

1、 身份认证:;

指客户端与服务端连接进行身份认证,包括客户端与kafka代理之间的连接认证、代理之间的连接认证、代理与ZooKeeper之间的连接认证。目前支持SSL、SASL/Kerberos、SASL/PLAIN这三种认证机制;

2、 权限控制:;

权限控制是指对客户端的读写操作进行权限控制,包括对于消息或者Kafka集群操作权限控制。权限控制是可插拔的,并且支持与外部的授权服务进行集成;

1.利用SASL/PLAIN进行身份认证

2.权限控制

kafk-acls.sh脚本支持查询(list)、添加(add)、移除(remove)这三类权限控制的操作。要启用Kafka ACL权限控制,首先需要在server.properties文件中增加权限控制实现类的设置;

10.镜像操作

Kafka提供了一个镜像操作的工具kafka-mirror-marker.sh,用于将一个集群的数据同步到另外一个集群。通过这个工具可以方便的实现两个集群之间的数据迁移;

Kafka镜像工具的本质是创建一个消费者,从源集群中待迁移的主题消费数据,然后创建一个生产者,将消费者从源集群中拉取的数据写入目标集群。

【注意】:

由于镜像操作的命令是启动一个生产者和一个消费者进行数据镜像操作,因此数据同步完成之后,该命令依然在等待新的数据进行同步,也就是需要客户端自己查看数据是否已经同步完成,在保证数据同步完成之后需要手动关闭该命令。同时客户端可以在目标集群中创建主题,主题的分区以及副本数可以与源集群中该主题对应的分区以及副本数不一致。

如果希望镜像操作启动的生产者在写入消息的时候创建主题则需要保证目标集群已经设置auto.create.topics.enable=true