博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
华为如何生成日志_深入理解Kafka服务端之滚动生成新日志段的流程及条件
阅读量:6469 次
发布时间:2019-06-23

本文共 11151 字,大约阅读时间需要 37 分钟。

一、场景分析     之前提到,Kafka中一个分区对应一个Log对象,每个Log对象下面又划分了多个日志段LogSegment。那么这些日志段的划分策略是什么?即满足什么条件时会生成新的日志段,以及生成新日志段的流程是什么样的。这篇来进行详细的分析。
二、图示说明
滚动生成新的日志段流程:

b99dae6afe00db38319968bad3cd3c31.png

三、源码分析

    1. 新的日志段是如何生成的。

    由于LogSegment是由Log管理的,所以按理说,控制生成新的LogSegment的方法就应该在Log类中。这个方法就是roll()方法,它的注释如下:

* Roll the log over to a new active segment starting with the current logEndOffset.* This will trim the index to the exact size of the number of entries it currently contains.
大概翻译就是:     滚动生成新的active日志段,以当前的LogEndOffset作为日志段的起始偏移量。     这个方法将裁剪索引文件至实际大小。(具体为何裁剪索引文件可以看《深入理解Kafka服务端之索引文件及mmap内存映射 》)
这里要重点关注一个概念:  LogEndOffset,简称LEO。是Kafka服务端十分重要的一个概念,后面还会多次提及。  表示的是下一条待写入消息的偏移量。

生成新日志段的流程分析:

def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
//记录开始时间 val start = time.hiResClockMs() lock synchronized {
//检查索引文件的内存映射是否关闭 checkIfMemoryMappedBufferClosed() //计算新日志段文件的起始偏移量baseOffset val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) //生成新的日志文件 val logFile = Log.logFile(dir, newOffset) //如果newOffset对应的日志段文件已存在      if (segments.containsKey(newOffset)) {
//如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段        if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
warn(s"Trying to roll a new log segment with start offset $newOffset " + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + s" size of offset index: ${activeSegment.offsetIndex.entries}.") //异步删除active日志段(先标记为.delete) deleteSegment(activeSegment) } else {
//如果日志段文件已存在且不是active,则抛异常 throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + s"segment is ${segments.get(newOffset)}.") } //如果计算出的baseOffset比active日志段的baseOffset还小,则抛异常 } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
throw new KafkaException( s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") } else {
//代码走这里说明newOffset正常 val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名 val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名 val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名 //如果文件已存在则删除 for (file if file.exists) {
warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") Files.delete(file.toPath) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())      }       producerStateManager.updateMapEndOffset(newOffset) producerStateManager.takeSnapshot() //生成新的日志段对象 val segment = LogSegment.open(dir, baseOffset = newOffset, config, time = time, fileAlreadyExists = false, initFileSize = initFileSize, preallocate = config.preallocate) //将新的日志段对象添加到Map addSegment(segment) //更新LEO updateLogEndOffset(nextOffsetMetadata.messageOffset)      // 异步刷写旧的日志段 scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") //返回生成的日志段对象 segment } }}

a. 检查该Log对象的索引文件内存映射是否关闭

//检查索引文件的内存映射是否关闭,默认未关闭,如果关闭了则该Log对象就不允许进行磁盘IO操作了checkIfMemoryMappedBufferClosed()

是否关闭由一个boolean类型的变量标记,默认为false

@volatile private var isMemoryMappedBufferClosed = false

b. 计算新的日志段文件的起始偏移量baseOffset

//计算新日志段文件的起始偏移量baseOffsetval newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset
这里newOffset取的是调用roll方法传入的Long类型的参数和LEO的较大值

c. 生成新的日志文件,即.log文件。参数dir就是该Log对象对应的分区在磁盘上的物理路径

//生成新的日志文件val logFile = Log.logFile(dir, newOffset)
d.验证起始偏移量的合法性     d1. 如果起始偏移量newOffset对应的日志段对象已存在:
  • 如果当前active日志段的起始偏移量baseOffset和newOffset一样,且active日志段里面没有保存任何消息,则异步删除active日志段
  • 如果不是active日志段,则抛出异常
if (segments.containsKey(newOffset)) {
//如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段  if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) {
warn(s"Trying to roll a new log segment with start offset $newOffset " + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + s" size of offset index: ${activeSegment.offsetIndex.entries}.") //异步删除active日志段(先标记为.delete) deleteSegment(activeSegment) } else {
//如果日志段文件已存在且不是active,则抛异常 throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + s"segment is ${segments.get(newOffset)}.")  }}

    d2. 如果计算出的newOffset比active日志段的起始偏移量baseOffset还小,则抛异常:

else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
throw new KafkaException( s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")}

    d3. 如果newOffset没有问题,则创建日志段对应的各种索引文件。

else {
//代码走这里说明newOffset正常 val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名 val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名 val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名 //如果文件已存在则删除 for (file if file.exists) {
warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") Files.delete(file.toPath) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())}

e. 生成新的日志段对象

//生成新的日志段对象val segment = LogSegment.open(dir,  baseOffset = newOffset,  config,  time = time,  fileAlreadyExists = false,  initFileSize = initFileSize,  preallocate = config.preallocate)

f. 将生成的日志段对象添加到集合segments中,这个集合用的就是ConcurrentSkipListMap跳表这种数据结构

//将新的日志段对象添加到MapaddSegment(segment)
 

g. 更新LEO

//更新LEOupdateLogEndOffset(nextOffsetMetadata.messageOffset)

h. 调度异步程序刷写旧的日志段文件

scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)

i. 返回新生成日志段对象

    2.滚动生成新日志段的条件剖析:

    直接查看调用roll方法的地方,在Log.maybeRoll方法中,该方法的作用是:如果有必要,滚动生成一个新的日志段对象返回;否则返回当前 active 日志段对象:

private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = {
//获取当前active日志段对象 val segment = activeSegment //获取当前时间 val now = time.milliseconds //消息中的最大时间戳 val maxTimestampInMessages = appendInfo.maxTimestamp //消息中的最大偏移量 val maxOffsetInMessages = appendInfo.lastOffset //如果需要滚动生成新的日志段对象 if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) {
debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).") appendInfo.firstOffset match {
case Some(firstOffset) => roll(Some(firstOffset)) case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE)) } } else {
segment }}

其中判断是否需要滚动生成新的日志段对象调用的是LogSegment.shouldRoll方法:

segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))

LogSegment.shouldRoll方法:

def shouldRoll(rollParams: RollParams): Boolean = {
  //日志段等待滚动的时间 > 日志段保留的最大时间 - 扰动时间 即:如果为true,表示可以滚动了 //maxTimestampInMessages:待写入消息中的最大时间戳  //maxSegmentMs:日志段保留的最大时间,默认168小时,由参数 log.roll.hours 配置 //rollJitterMs:扰动值,避免同一时间生成大量的日志段文件给磁盘带来的压力 val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs //滚动条件:①日志段大小超过maxSegmentBytes(默认1G) size > rollParams.maxSegmentBytes - rollParams.messagesSize || //②日志段不为空,且等待滚动时间超过168h (size > 0 && reachedRollMs) || //③偏移量索引文件或者时间戳索引文件大小超过10M ④最大偏移量和baseOffset的差值超过Int.MaxValue offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)//判断是否可以将日志段中最大的偏移量转为相对偏移量}

a. timeWaitedForRoll方法:计算日志段等待滚动的时间:

def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = {
//加载日志段中第一个批次的最大时间戳  loadFirstBatchTimestamp() rollingBasedTimestamp match {
    //如果 rollingBasedTimestamp不为None,则返回当前待写入消息的最大时间戳 - rollingBasedTimestamp 的值 case Some(t) if t >= 0 => messageTimestamp - t //如果 rollingBasedTimestamp 为None,则返回当前时间 - 日志段的创建时间 case _ => now - created }}
其中loadFirstBatchTimestamp方法的作用是:如果变量rollingBasedTimestamp为None,则加载日志段中第一个批次的最大时间戳(如果日志段为空,那么该变量还是None);如果不为None则什么都不做
private def loadFirstBatchTimestamp(): Unit = {
if (rollingBasedTimestamp.isEmpty) {
val iter = log.batches.iterator() if (iter.hasNext) rollingBasedTimestamp = Some(iter.next().maxTimestamp) }}
这里再看下变量rollingBasedTimestamp如果不为None,是什么时候被赋值的,在LogSegment.append()方法中,其值是
第一次写入的消息集合的最大时间戳
//获取日志段追加数据的起始物理地址val physicalPosition = log.sizeInBytes()//如果物理地址为0,说明日志段为空,即当前为第一次写入消息if (physicalPosition == 0)  //更新用于日志段切分的时间戳为当前消息集合的最大时间戳  rollingBasedTimestamp = Some(largestTimestamp)

所以日志段等待滚动的时间的计算逻辑为:

  • 如果日志段为空:当前时间 -  日志段的创建时间
  • 如果日志段不为空:待写入消息集合的最大时间戳 - 第一次写入消息集合的最大时间戳
b. 如果日志段等待滚动的时间超过了 最大的保留时间   - 扰动值,则表示应该滚动生成新日志段了
  • rollParams.maxSegmentMs:日志段时间维度的阈值,由服务端参数:log.roll.hours 和 log.roll.ms 配置,log.roll.ms 优先级高。但默认情况下只配置了 log.roll.hours ,为168,即7天。如果等待超过这个时间该日志段还未写满,则滚动生成新的日志段(如果日志段为空则不会滚动)。

  • rollJitterMs:扰动值。避免在同一时间生成大量的日志段文件给磁盘IO带来压力

val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs

reachedRollMs:Boolean值,标记时间维度是否满足滚动条件。

c. 滚动生成新日志段的条件:

    c1. 日志段容量达到阈值:

size > rollParams.maxSegmentBytes - rollParams.messagesSize=>size + rollParams.messagesSize > rollParams.maxSegmentBytes即已经写入的字节 + 待写入消息字节 > 日志段最大容量
  • rollParams.maxSegmentBytes:日志段文件的最大容量。由服务端参数:log.segment.bytes 配置,默认为1G。

    
c2. 日志段不为空,且等待滚动的时间达到阈值。
这里注意:如果日志段为空,即使达到了等待滚动的时间,也不会滚动生成新的日志段
(size > 0 && reachedRollMs)
    
c3. 索引文件容量达到阈值
offsetIndex.isFull || timeIndex.isFull

    索引文件的阈值由服务端参数:log.index.size.max.bytes  配置,默认为10M。

    
c4. 待写入消息的最大偏移量 - 日志段的起始偏移量baseOffset 超过Int.MaxValue,无法转为相对偏移量:
!canConvertToRelativeOffset(rollParams.maxOffsetInMessages)
总结:
    日志段滚动的条件有4个:
  • 日志段容量超过阈值:默认为1G
  • 日志段不为空且等待滚动的时间超过阈值:默认为7天
  • 日志段索引文件(包括偏移量索引文件和时间戳索引文件)容量超过阈值:默认为10M
  • 待写入消息集合的最大偏移量 - 日志段起始偏移量 > Int.MaxValue,此时无法转为相对偏移量

转载地址:http://tsdko.baihongyu.com/

你可能感兴趣的文章
网络服务搭建、配置与管理大全(Linux版)
查看>>
Silverlight 5 Beta新特性[4]文本缩进控制
查看>>
springMVC多数据源使用 跨库跨连接
查看>>
简单java在线测评程序
查看>>
录音和朗诵的实现
查看>>
Git服务端和客户端安装笔记
查看>>
Spring Security(14)——权限鉴定基础
查看>>
云安全与IT系统漏洞管理成为IT决策者最关注的话题
查看>>
2016年全球光纤需求量将达4.25亿芯公里 中国占57%决定产业格局
查看>>
MaxCompute UDF系列之拼音转换
查看>>
《JavaScript和jQuery实战手册(原书第2版)》——2.2节内置函数
查看>>
部署混合云指南:多云服务商管理的八大要素
查看>>
视频监控热成像技术在民用领域的应用
查看>>
北大深圳医院使用移动医疗技术,阻断乙肝母婴传播
查看>>
火绒安全马刚自述:中国还有一个“纯粹”的杀毒软件
查看>>
20年历史的bug被发现会泄漏微软 Live 账号登录信息
查看>>
Java 基础DAY 01
查看>>
并发基础笔记-(线程基础)
查看>>
web前端学习教程(视频教程、学习教程、学习路线、课程大纲)
查看>>
[译] 论 Rust 和 WebAssembly 对源码地址索引的极限优化
查看>>