背景

  • 网站内,要实时高效的处理海量数据
    1. 用户行为日志
    2. 业务数据库变更
    3. 网站上爬取的数据
    4. 也可以作为系统业务解耦的方式。

基本概念

  • broker:指代kafka的进程,一般放一台服务器上,即真实存储消息的服务器
  • topic:逻辑分区,同一类业务数据集合,可以有多个topic
  • partition: 物理分区,实际存储每个topic的消息,具体位置是在broker上,每个broker上可能有多个topic的不同分区。

kafka 的系统设计

消息设计

(1)消息总长度
(2)时间戳增量:跟RecordBatch的时间戳的增量差值
(3)offset增量:跟RecordBatch的offset的增量差值
(4)key长度
(5)key
(6)value长度
(7)value
(8)header个数
(9)header:自定义的消息元数据,key-value对

存储设计

(1) .log 文件:每个分区(Partition)对应一个或多个.log文件,这些文件被称为日志段(Log Segment)
(2) .index 文件 每个日志段对应一个.index文件。
(3) .timestamp文件

kafka 的几大技术

一、kafka的高可用设计

1、多副本冗余设置

  • 即一个partition不仅有leader,还有多个follow分布在不同的机器上,当一个broker宕机后,通过zk的协调,重新选出leader继续提供服务。

2、ISR列表设置

  • in-sync replica,跟leader partition保持同步的follower partition的数量,从新选举的leader只能从该列表出。消息提交成功,也是在该列表中的partition 全部复制成功,才算提交了。

3、HW和LEO

  • HW:代表消费者可看到的消息的offset。
  • LEO:代表下一个要写入的数据的offset。
    • 过程:
      (1)leader 收到消息会更新自己的LEO,同时维护每个follow的LEO值
      (2)follow 拉取消息时候,会带上自己的LEO值,同时能从leader拿到当前leader的HW
      (3)follow 拉取到leader的HW,更新自己的HW ,策略为 min(leader的HW,自己的LEO),
      (4)leader 更新HW,看自己的LEO和follow的LEO,选用min(all LEO) 作为HW。
      (5)每次leader再重新选举后,都有一个版本概念,以及自己写数据的下标记录。

4、一些参数

  • replica.lag.time.max.ms:如果某个follower的LEO一直落后leader超过了该设定的秒,那么才判定这个follower是out-of-sync,
  • replica.lag.time.max.ms:规定了follower如果在该时间内仍然没有找leader发送请求,就会认为follower是out-of-sync,就会从ISR列表里移除了
  • log.index.interval.bytes:在日志文件写入多少数据,就要在索引文件写一条索引,默认是4KB

二、kafka的高性能

  • 延迟:是处理一条消息的时间

  • 吞吐量:是整个系统处理消息的能力

    1、批处理

    • 平衡了延迟和吞吐量,让性能更加优秀。

    2、顺序写磁盘

    • 先写日志缓存(kafka内存)
    • 达到时间限额,写os 的page cache
    • 最后写入磁盘

    3、零拷贝

    kafka零拷贝优化

    4、使用了Reactor网络通讯模型

    • Acceptor:负责接受新的网络连接。 默认1个
    • Processor:负责处理网络I/O操作,包括读取请求和写入响应。默认3
    • Request Handler:负责处理具体的请求逻辑。 默认8个
    • Selector:Java NIO中的Selector,用于监控多个Channel的I/O事件,每个Processor线程都包含一个Selector,用于监控其负责的网络连接

三、kafka的producer 高性能

1、流程

  • 创建ProducerRecord,包含主题(topic)、分区(partition)、键(key)、值(value),
  • 根据分区策略选择分区。
  • 进入缓冲区(RecordAccumulator)中,等待批量发送。batch.size=16k,满了就发出去,linger.ms 表示超过x毫秒也会发出去。
  • Sender线程负责从缓冲区中取出消息批次,遍历topic 和 parition,如果发送到同一个broker,进一步整合,最后打包成ProduceRequest。max.request.size=1m,否则就不能发送

2、核心参数

  • cks=0:Producer不等待Broker的确认,消息可能会丢失。
  • acks=1:Producer等待Leader确认,消息仅在Leader上持久化后返回确认。
  • acks=all(或acks=-1):Producer等待所有ISR(In-Sync Replicas)中的副本确认,确保消息在多个副本上持久化后返回确认。

3、防止消息重复提交且保证分区顺序

  • enable.idempotence=true:kafka提供的幂等性producer
  • 原理:每个幂等性Producer实例在初始化时,会从Kafka集群获取一个唯一的Producer ID,每个消息在发送时,会被分配一个递增的序列号。序列号在每个分区内是唯一且递增的,Broker端会维护每个分区的最新序列号,并在接收到消息时进行去重和顺序检查

四,kafka的consumer 高性能

1、reblance 时机

  • heartbeat.interval.ms:consumer心跳时间,
  • session.timeout.ms:kafka多长时间感知不到一个consumer
  • max.poll.interval.ms:如果在两次poll操作之间,超过了这个时间,

2、其他参数

  • auto.offset.reset:earliest,如果下次重启,发现要消费的offset不在分区的范围内,就会重头开始消费;latest,当没有初始偏移量或当前偏移量超出范围时,消费者将从分区的最新消息开始消费
  • enable.auto.commit : 开启自动提交

五、broker消息积压了怎么办

1、首先需要对kafka的topic 进行监控

2、修改topic 的配置,新增分区

3、新增消费者实例,加入消费者组

4、 优化消费者线程

5、批量拉取消息,增大拉取消息的大小限制。