设为首页 收藏本站
查看: 1236|回复: 0

[经验分享] Apache Kafka源码分析 – Log Management

[复制链接]

尚未签到

发表于 2015-8-2 13:35:21 | 显示全部楼层 |阅读模式
  
  LogManager
  LogManager会管理broker上所有的logs(在一个log目录下),一个topic的一个partition对应于一个log(一个log子目录)
首先loadLogs会加载每个partition所对应的log对象, 然后提供createLog,getLog,deleteLog之类的管理接口
并且会创建些后台线程来进行,cleanup,flush,checkpoint生成之类的工作

DSC0000.gif DSC0001.gif /**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
* All read and write operations are delegated to the individual log instances.
*
* The log manager maintains logs in one or more directories. New logs are created in the data directory
* with the fewest logs. No attempt is made to move partitions after the fact or balance based on
* size or I/O rate.
*
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
scheduler: Scheduler,
private val time: Time) extends Logging {
//kafka.utils.Pool,对ConcurrentHashMap的封装
private val logs = new Pool[TopicAndPartition, Log]() //一个topic的partition对应于一个log
/**
* Recover and load all logs in the given data directories
*/
private def loadLogs(dirs: Seq[File]) {
for(dir  0) {
// append an entry to the index (if needed)
if(bytesSinceLastIndexEntry > indexIntervalBytes) { //仅index部分message
index.append(offset, log.sizeInBytes())  //写index文件
this.bytesSinceLastIndexEntry = 0
}
// append the messages
log.append(messages) //写messageset文件
this.bytesSinceLastIndexEntry += messages.sizeInBytes
}
}
/**
* Read a message set from this segment beginning with the first offset >= startOffset. The message set will include
* no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
*
* @param startOffset A lower bound on the first offset to include in the message set we read
* @param maxSize The maximum number of bytes to include in the message set we read
* @param maxOffset An optional maximum offset for the message set we read
*
* @return The message set read or null if the startOffset is larger than the largest offset in this log.
*/
@threadsafe
def read(startOffset: Long, maxOffset: Option[Long], maxSize: Int): MessageSet = {   
val logSize = log.sizeInBytes // this may change, need to save a consistent copy
val startPosition = translateOffset(startOffset)
// calculate the length of the message set to read based on whether or not they gave us a maxOffset
val length =
maxOffset match {
case None =>
// no max offset, just use the max size they gave unmolested
maxSize
case Some(offset) => {
// there is a max offset, translate it to a file position and use that to calculate the max read size
if(offset < startOffset)
throw new IllegalArgumentException("Attempt to read with a maximum offset (%d) less than the start offset (%d).".format(offset, startOffset))
val mapping = translateOffset(offset, startPosition.position)
val endPosition =
if(mapping == null)
logSize // the max offset is off the end of the log, use the end of the file
else
mapping.position
min(endPosition - startPosition.position, maxSize)
}
}
log.read(startPosition.position, length) //读出messageset
}
/**
* Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log and index.
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
*
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
def recover(maxMessageSize: Int): Int = {...}
}
  FileMessageSet
  Segment中实际存放log message的文件,通过FileChannel可以读写文件




   1: /**   2:  * An on-disk message set. An optional start and end position can be applied to the message set   3:  * which will allow slicing a subset of the file.   4:  * @param file The file name for the underlying log data   5:  * @param channel the underlying file channel used   6:  * @param start A lower bound on the absolute position in the file from which the message set begins   7:  * @param end The upper bound on the absolute position in the file at which the message set ends   8:  * @param isSlice Should the start and end parameters be used for slicing?   9:  */  10: @nonthreadsafe  11: class FileMessageSet private[kafka](@volatile var file: File,  12:                                     private[log] val channel: FileChannel,  13:                                     private[log] val start: Int,  14:                                     private[log] val end: Int,  15:                                     isSlice: Boolean) extends MessageSet with Logging {...}  OffsetIndex
  Segment的index文件, 这是0.8后加上的,之前message直接使用物理offset标识
新版本中还是改成了使用逻辑offset,让物理地址对用户透明, 这样就需要一个index来匹配逻辑offset和物理地址
index考虑到效率,最好放在内存中,但是考虑到size问题, 所以使用MappedByteBuffer(参考,Java RandomAccessFile用法 )
注释里面说,
Index是sparse的,不保证每个message在index都有索引的entry
Index由entry组成,每个entry为8-byte,逻辑offset4-byte,物理地址4-byte
并且逻辑offset是基于base offset的相对offset,否则无法保证只使用4-byte




   1: /**   2:  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:   3:  * that is it may not hold an entry for all messages in the log.   4:  *    5:  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.   6:  *    7:  * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant   8:  * to locate the offset/location pair for the greatest offset less than or equal to the target offset.   9:  *   10:  * Index files can be opened in two ways: either as an empty, mutable index that allows appends or  11:  * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an   12:  * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.  13:  *   14:  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.  15:  *   16:  * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the   17:  * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,  18:  * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use  19:  * only 4 bytes for the offset.  20:  *   21:  * The frequency of entries is up to the user of this class.  22:  *   23:  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal   24:  * storage format.  25:  */  26: class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {  27:   private val lock = new ReentrantLock  //操作index文件需要加锁  28:     29:   /* initialize the memory mapping for this index */  30:   private var mmap: MappedByteBuffer =  //使用MappedByteBuffer来操作index文件以应对大文件  31:     {  32:       val newlyCreated = file.createNewFile()  33:       val raf = new RandomAccessFile(file, "rw")  34:       val len = raf.length()  35:       val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)            36:     }  37:    38:   //通过byte偏移从buffer中读出某个entry的内容,offset和physical地址  39:   /* return the nth offset relative to the base offset */  40:   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)  41:   /* return the nth physical position */  42:   private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)  43:    44:   //通过二分查找找到targetOffset或最接近的offset(less than)  45:   /**  46:    * Find the largest offset less than or equal to the given targetOffset   47:    * and return a pair holding this offset and it's corresponding physical file position.  48:    *   49:    * @param targetOffset The offset to look up.  50:    *   51:    * @return The offset found and the corresponding file position for this offset.   52:    * If the target offset is smaller than the least entry in the index (or the index is empty),  53:    * the pair (baseOffset, 0) is returned.  54:    */  55:   def lookup(targetOffset: Long): OffsetPosition = {...}  56:    57: /**  58:  * Get the nth offset mapping from the index  59:  * @param n The entry number in the index  60:  * @return The offset/position pair at that entry  61:  */  62: def entry(n: Int): OffsetPosition = {  63:   maybeLock(lock) {  64:     if(n >= entries)  65:       throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))  66:     val idx = mmap.duplicate  67:     OffsetPosition(relativeOffset(idx, n), physical(idx, n))  68:   }  69: }  70:    71: /**  72:  * Append an entry for the given offset/location pair to the index. This entry must have a larger offset than all subsequent entries.  73:  */  74: def append(offset: Long, position: Int) {  75:   inLock(lock) {  76:     require(!isFull, "Attempt to append to a full index (size = " + size + ").")  77:     if (size.get == 0 || offset > lastOffset) {  78:       debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))  79:       this.mmap.putInt((offset - baseOffset).toInt)  80:       this.mmap.putInt(position)  81:       this.size.incrementAndGet()  82:       this.lastOffset = offset  83:       require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")  84:     } else {  85:       throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."  86:         .format(offset, entries, lastOffset, file.getAbsolutePath))  87:     }  88:   }  89: }  
  具体看看如何从逻辑offset转化为物理地址的?
  0.8中增加了逻辑offset,那么就需要做逻辑offset和物理地址间的转化
简单的方法,直接用hashmap,cache所有offset,问题就是这样空间耗费比较大
所以kafka的方式,是分段索引,用offset通过二分查找中index中找出段的起始地址,然后再去file里面遍历找出精确的地址, 时间换空间的设计
  1. LogSegment.translateOffset
首先是从index文件中找到近似的物理地址
前面说了,index中从效率考虑并不会为每个offset建立索引entry,只会分段建立offset索引, 所以从index中直接可以找到精确物理地址的概率不大,但是可以找到最接近的那个物理地址
如果你觉得index的粒度比较粗,可以直接给出开始查找的startingFilePosition
所以精确的物理地址需要到MessageSet文件里面去继续找


  /**
* Find the physical file position for the first message with offset >= the requested offset.
*
* The lowerBound argument is an optimization that can be used if we already know a valid starting position
* in the file higher than the greast-lower-bound from the index.
*
* @param offset The offset we want to translate
* @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
* when omitted, the search will begin at the position in the offset index.
*
* @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
*/
@threadsafe
private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
val mapping = index.lookup(offset) //从index查出近似物理地址
log.searchFor(offset, max(mapping.position, startingFilePosition))
}
  2. FileMessageSet.searchFor
在messageSet中,message的构成是,overhead(MessageSize+Offset)和message
而searchFor的逻辑是从startingPosition开始, 逐条遍历各个message,并从overhead中取出offset进行比较,直到找到target offset为止


  /**
* Search forward for the file position of the last offset that is greater than or equal to the target offset
* and return its physical position. If no such offsets are found, return null.
* @param targetOffset The offset to search for.
* @param startingPosition The starting position in the file to begin searching from.
*/
def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
var position = startingPosition
val buffer = ByteBuffer.allocate(MessageSet.LogOverhead) // LogOverhead = MessageSizeLength + OffsetLength
val size = sizeInBytes()
while(position + MessageSet.LogOverhead < size) { //从postion开始逐条遍历
buffer.rewind()
channel.read(buffer, position)
buffer.rewind()
val offset = buffer.getLong()
if(offset >= targetOffset)  //判断是否找到offset
return OffsetPosition(offset, position)
val messageSize = buffer.getInt()
position += MessageSet.LogOverhead + messageSize //递进到下个message
}
null
}

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-93354-1-1.html 上篇帖子: apache kafka技术分享系列(目录索引)--转载 下篇帖子: Apache Kafka源码分析 – Log Management
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表