本文共 11151 字,大约阅读时间需要 37 分钟。
三、源码分析
1. 新的日志段是如何生成的。由于LogSegment是由Log管理的,所以按理说,控制生成新的LogSegment的方法就应该在Log类中。这个方法就是roll()方法,它的注释如下:
* Roll the log over to a new active segment starting with the current logEndOffset.* This will trim the index to the exact size of the number of entries it currently contains.大概翻译就是: 滚动生成新的active日志段,以当前的LogEndOffset作为日志段的起始偏移量。 这个方法将裁剪索引文件至实际大小。(具体为何裁剪索引文件可以看《深入理解Kafka服务端之索引文件及mmap内存映射 》)
这里要重点关注一个概念: LogEndOffset,简称LEO。是Kafka服务端十分重要的一个概念,后面还会多次提及。 表示的是下一条待写入消息的偏移量。
生成新日志段的流程分析:
def roll(expectedNextOffset: Option[Long] = None): LogSegment = { maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") { //记录开始时间 val start = time.hiResClockMs() lock synchronized { //检查索引文件的内存映射是否关闭 checkIfMemoryMappedBufferClosed() //计算新日志段文件的起始偏移量baseOffset val newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset) //生成新的日志文件 val logFile = Log.logFile(dir, newOffset) //如果newOffset对应的日志段文件已存在 if (segments.containsKey(newOffset)) { //如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段 if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { warn(s"Trying to roll a new log segment with start offset $newOffset " + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + s" size of offset index: ${activeSegment.offsetIndex.entries}.") //异步删除active日志段(先标记为.delete) deleteSegment(activeSegment) } else { //如果日志段文件已存在且不是active,则抛异常 throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + s"segment is ${segments.get(newOffset)}.") } //如果计算出的baseOffset比active日志段的baseOffset还小,则抛异常 } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { throw new KafkaException( s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment") } else { //代码走这里说明newOffset正常 val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名 val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名 val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名 //如果文件已存在则删除 for (file if file.exists) { warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") Files.delete(file.toPath) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment()) } producerStateManager.updateMapEndOffset(newOffset) producerStateManager.takeSnapshot() //生成新的日志段对象 val segment = LogSegment.open(dir, baseOffset = newOffset, config, time = time, fileAlreadyExists = false, initFileSize = initFileSize, preallocate = config.preallocate) //将新的日志段对象添加到Map addSegment(segment) //更新LEO updateLogEndOffset(nextOffsetMetadata.messageOffset) // 异步刷写旧的日志段 scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L) info(s"Rolled new log segment at offset $newOffset in ${time.hiResClockMs() - start} ms.") //返回生成的日志段对象 segment } }}
a. 检查该Log对象的索引文件内存映射是否关闭
//检查索引文件的内存映射是否关闭,默认未关闭,如果关闭了则该Log对象就不允许进行磁盘IO操作了checkIfMemoryMappedBufferClosed()
是否关闭由一个boolean类型的变量标记,默认为false
@volatile private var isMemoryMappedBufferClosed = false
b. 计算新的日志段文件的起始偏移量baseOffset
//计算新日志段文件的起始偏移量baseOffsetval newOffset = math.max(expectedNextOffset.getOrElse(0L), logEndOffset这里newOffset取的是调用roll方法传入的Long类型的参数和LEO的较大值
c. 生成新的日志文件,即.log文件。参数dir就是该Log对象对应的分区在磁盘上的物理路径
//生成新的日志文件val logFile = Log.logFile(dir, newOffset)d.验证起始偏移量的合法性 d1. 如果起始偏移量newOffset对应的日志段对象已存在:
if (segments.containsKey(newOffset)) { //如果active日志段的起始偏移量和上面计算的起始偏移量相同,且active日志段的日志文件大小为0,则删除active日志段 if (activeSegment.baseOffset == newOffset && activeSegment.size == 0) { warn(s"Trying to roll a new log segment with start offset $newOffset " + s"=max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already " + s"exists and is active with size 0. Size of time index: ${activeSegment.timeIndex.entries}," + s" size of offset index: ${activeSegment.offsetIndex.entries}.") //异步删除active日志段(先标记为.delete) deleteSegment(activeSegment) } else { //如果日志段文件已存在且不是active,则抛异常 throw new KafkaException(s"Trying to roll a new log segment for topic partition $topicPartition with start offset $newOffset" + s" =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " + s"segment is ${segments.get(newOffset)}.") }}
d2. 如果计算出的newOffset比active日志段的起始偏移量baseOffset还小,则抛异常:
else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) { throw new KafkaException( s"Trying to roll a new log segment for topic partition $topicPartition with " + s"start offset $newOffset =max(provided offset = $expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active segment $activeSegment")}
d3. 如果newOffset没有问题,则创建日志段对应的各种索引文件。
else { //代码走这里说明newOffset正常 val offsetIdxFile = offsetIndexFile(dir, newOffset)//生成index文件名 val timeIdxFile = timeIndexFile(dir, newOffset)//生成timeindex文件名 val txnIdxFile = transactionIndexFile(dir, newOffset)//生成txnindex文件名 //如果文件已存在则删除 for (file if file.exists) { warn(s"Newly rolled segment file ${file.getAbsolutePath} already exists; deleting it first") Files.delete(file.toPath) } Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())}
e. 生成新的日志段对象
//生成新的日志段对象val segment = LogSegment.open(dir, baseOffset = newOffset, config, time = time, fileAlreadyExists = false, initFileSize = initFileSize, preallocate = config.preallocate)
f. 将生成的日志段对象添加到集合segments中,这个集合用的就是ConcurrentSkipListMap跳表这种数据结构
//将新的日志段对象添加到MapaddSegment(segment)
g. 更新LEO
//更新LEOupdateLogEndOffset(nextOffsetMetadata.messageOffset)
h. 调度异步程序刷写旧的日志段文件
scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
i. 返回新生成日志段对象
2.滚动生成新日志段的条件剖析:
直接查看调用roll方法的地方,在Log.maybeRoll方法中,该方法的作用是:如果有必要,滚动生成一个新的日志段对象返回;否则返回当前 active 日志段对象:
private def maybeRoll(messagesSize: Int, appendInfo: LogAppendInfo): LogSegment = { //获取当前active日志段对象 val segment = activeSegment //获取当前时间 val now = time.milliseconds //消息中的最大时间戳 val maxTimestampInMessages = appendInfo.maxTimestamp //消息中的最大偏移量 val maxOffsetInMessages = appendInfo.lastOffset //如果需要滚动生成新的日志段对象 if (segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))) { debug(s"Rolling new log segment (log_size = ${segment.size}/${config.segmentSize}}, " + s"offset_index_size = ${segment.offsetIndex.entries}/${segment.offsetIndex.maxEntries}, " + s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " + s"inactive_time_ms = ${segment.timeWaitedForRoll(now, maxTimestampInMessages)}/${config.segmentMs - segment.rollJitterMs}).") appendInfo.firstOffset match { case Some(firstOffset) => roll(Some(firstOffset)) case None => roll(Some(maxOffsetInMessages - Integer.MAX_VALUE)) } } else { segment }}
其中判断是否需要滚动生成新的日志段对象调用的是LogSegment.shouldRoll方法:
segment.shouldRoll(RollParams(config, appendInfo, messagesSize, now))
LogSegment.shouldRoll方法:
def shouldRoll(rollParams: RollParams): Boolean = { //日志段等待滚动的时间 > 日志段保留的最大时间 - 扰动时间 即:如果为true,表示可以滚动了 //maxTimestampInMessages:待写入消息中的最大时间戳 //maxSegmentMs:日志段保留的最大时间,默认168小时,由参数 log.roll.hours 配置 //rollJitterMs:扰动值,避免同一时间生成大量的日志段文件给磁盘带来的压力 val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs //滚动条件:①日志段大小超过maxSegmentBytes(默认1G) size > rollParams.maxSegmentBytes - rollParams.messagesSize || //②日志段不为空,且等待滚动时间超过168h (size > 0 && reachedRollMs) || //③偏移量索引文件或者时间戳索引文件大小超过10M ④最大偏移量和baseOffset的差值超过Int.MaxValue offsetIndex.isFull || timeIndex.isFull || !canConvertToRelativeOffset(rollParams.maxOffsetInMessages)//判断是否可以将日志段中最大的偏移量转为相对偏移量}
a. timeWaitedForRoll方法:计算日志段等待滚动的时间:
def timeWaitedForRoll(now: Long, messageTimestamp: Long) : Long = { //加载日志段中第一个批次的最大时间戳 loadFirstBatchTimestamp() rollingBasedTimestamp match { //如果 rollingBasedTimestamp不为None,则返回当前待写入消息的最大时间戳 - rollingBasedTimestamp 的值 case Some(t) if t >= 0 => messageTimestamp - t //如果 rollingBasedTimestamp 为None,则返回当前时间 - 日志段的创建时间 case _ => now - created }}其中loadFirstBatchTimestamp方法的作用是:如果变量rollingBasedTimestamp为None,则加载日志段中第一个批次的最大时间戳(如果日志段为空,那么该变量还是None);如果不为None则什么都不做
private def loadFirstBatchTimestamp(): Unit = { if (rollingBasedTimestamp.isEmpty) { val iter = log.batches.iterator() if (iter.hasNext) rollingBasedTimestamp = Some(iter.next().maxTimestamp) }}这里再看下变量rollingBasedTimestamp如果不为None,是什么时候被赋值的,在LogSegment.append()方法中,其值是 第一次写入的消息集合的最大时间戳 :
//获取日志段追加数据的起始物理地址val physicalPosition = log.sizeInBytes()//如果物理地址为0,说明日志段为空,即当前为第一次写入消息if (physicalPosition == 0) //更新用于日志段切分的时间戳为当前消息集合的最大时间戳 rollingBasedTimestamp = Some(largestTimestamp)
所以日志段等待滚动的时间的计算逻辑为:
rollParams.maxSegmentMs:日志段时间维度的阈值,由服务端参数:log.roll.hours 和 log.roll.ms 配置,log.roll.ms 优先级高。但默认情况下只配置了 log.roll.hours ,为168,即7天。如果等待超过这个时间该日志段还未写满,则滚动生成新的日志段(如果日志段为空则不会滚动)。
rollJitterMs:扰动值。避免在同一时间生成大量的日志段文件给磁盘IO带来压力
val reachedRollMs = timeWaitedForRoll(rollParams.now, rollParams.maxTimestampInMessages) > rollParams.maxSegmentMs - rollJitterMs
reachedRollMs:Boolean值,标记时间维度是否满足滚动条件。
c. 滚动生成新日志段的条件:
c1. 日志段容量达到阈值:
size > rollParams.maxSegmentBytes - rollParams.messagesSize=>size + rollParams.messagesSize > rollParams.maxSegmentBytes即已经写入的字节 + 待写入消息字节 > 日志段最大容量
rollParams.maxSegmentBytes:日志段文件的最大容量。由服务端参数:log.segment.bytes 配置,默认为1G。
(size > 0 && reachedRollMs)c3. 索引文件容量达到阈值
offsetIndex.isFull || timeIndex.isFull
索引文件的阈值由服务端参数:log.index.size.max.bytes 配置,默认为10M。
c4. 待写入消息的最大偏移量 - 日志段的起始偏移量baseOffset 超过Int.MaxValue,无法转为相对偏移量:!canConvertToRelativeOffset(rollParams.maxOffsetInMessages)总结: 日志段滚动的条件有4个:
转载地址:http://tsdko.baihongyu.com/