本文简单介绍当Broker接受到producerRequest的时候,ReplicaManager是如何处理的本地写入和WaitForFollowerFetch
每个Partition都有两个管理对象ReplicaManager 和 LogManger,ReplicaManager的作用是管理这台 broker 上的所有副本(replica)。在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。
管理对象 | 组成部分 | |
---|---|---|
日志管理器(LogManager) | 日志(Log) | 日志分段(LogSegment) |
副本管理器(ReplicaManager) | 分区(Partition) | 副本(Replica) |
生产者的ack是可以配置的,当acks=0时,生产者发送数据后,就不会等待服务器的响应,当acks=1时,生产者只需要等待Leader Partition的响应,当acks=all时,就会等待配置的isr集合中所有副本的返回,从而通过利用acks=all和isr动态集合来确保数据一致性问题:当Leader节点失效后,由于保证了所有ISR集合中所有副本的log end offset都是一致的,从而可以从ISR集合中选举新的Leader。
KafkaApis是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler传递过来的请求分发到不同的handle*()处理方法中,这一节主要是关注producer的请求:
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
handleProduceRequest中前面是验证request,定义sendResponseCallback,processingStatsCallback,其主要是为了记录metrics,之后就是主要的replicaManager.appendRecords:
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.CLIENT,
entriesPerPartition = authorizedRequestInfo,
requestLocal = requestLocal,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback,
transactionalId = produceRequest.transactionalId(),
transactionStatePartition = transactionStatePartition)
- appendEntries:
- appendLogResults
- appendToLocalLog
- appendRecordsToLeader
- appendToLocalLog
- appendLogResults
先对leaderIsrUpdateLock用读锁,避免这个时候更新Isr集合,造成副本Replica同步与Isr集合不一致的问题,每个Partition中都有一个log的对象,因为是appendToLeader,需要先判断log是不是LeaderLog,然后check此时partitionState的isr.size>=leaderLog.config.minInSyncReplicas,如果满足就appendAsLeader,具体log中是如何写进本地磁盘的先不深入。
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
// leaderLogIfLocal 这个变量是在 LeaderAndIsr 的构造函数中初始化的
leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = leaderLog.config.minInSyncReplicas
val inSyncSize = partitionState.isr.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)
// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
}
- delayedProduceRequestRequired
检查requiredAcks,如果是-1,则要等Isr集合中所有的副本都同步完成,这次的request才算是发送成功。构建DelayedProduce的request,然后调用delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)请求,异步的等待leader的其他副本同步完成。
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
其中delayedProduce封装的是每个TopicPartition对应的写入Leader副本的lastOffset + 1的记录,其他还有responseCallback
def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
appendResult.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset.map[Long](_.messageOffset)
.orElse(-1L),
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
)
) // response status
}
}
producerRequestKeys指的是如果其他副本还没有同步完成,就将这些topPartition加入到Watchers中,等异步完成的时候再移除。
DelayedOperation
DelayedOperationPurgatory是用来管理延迟操作,通过时间轮来完成定时任务或者异步任务。DelayedOperation有四个实现类,分别表示四类不同的延迟操作,也对应了四种不同的请求,我们先从replicaManager调用的地方开始:
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
tryCompleteElseWatch如其名字所示,先check有没有complete,否则将topPartition加入到Watchers中;
if (operation.safeTryCompleteOrElse {
watchKeys.foreach(key => watchForOperation(key, operation))
if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true
在调用safeTryCompleteOrElse时,会先抢占锁,这里的lock是自旋锁lockOpt.getOrElse(new ReentrantLock)
def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {
if (tryComplete()) true
else {
f
// last completion check
tryComplete()
}
}
tryComplete()满足的条件在源码的注释中有写道:
/**
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: Replica not assigned to partition
* Case B: Replica is no longer the leader of this partition
* Case C: This broker is the leader:
* C.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* C.2 - Otherwise, set the response with no error.
*/
如果满足其中任何一个条件,就进行forceComplete:
def tryComplete(): Boolean = {
// check for each partition if it still has pending acks
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>
// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) =
replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) =>
// Case A
(false, err)
case Right(partition) =>
partition.checkEnoughReplicasReachOffset(
status.requiredOffset)
}
// Case B || C.1 || C.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}
// check if every partition has satisfied
//at least one of case A, B or C
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}
getPartitionOrError是为了判断此broker是不是该副本的leader,以及此partition是不是还属于此Topic:
def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
Right(partition)
case HostedPartition.Offline =>
Left(Errors.KAFKA_STORAGE_ERROR)
case HostedPartition.None if metadataCache.contains(topicPartition) =>
// The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
// forces clients to refresh metadata to find the new location. This can happen, for example,
// during a partition reassignment if a produce request from the client is sent to a broker after
// the local replica has been deleted.
Left(Errors.NOT_LEADER_OR_FOLLOWER)
case HostedPartition.None =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}
checkEnoughReplicasReachOffset这个方法比较重要,主要是了检查ISR集合的offset是否已经到了Leader这次写入请求的offset,主要的check条件是leaderLog.highWatermark >= requiredOffset
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderLogIfLocal match {
case Some(leaderLog) =>
// keep the current immutable replica list reference
val curMaximalIsr = partitionState.maximalIsr
if (isTraceEnabled) {
def logEndOffsetString: ((Int, Long)) => String = {
case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
}
val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.stateSnapshot.logEndOffset))
val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition {
_._2 >= requiredOffset
}
trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
}
val minIsr = leaderLog.config.minInSyncReplicas
if (leaderLog.highWatermark >= requiredOffset) {
/*
* The topic may be configured not to accept messages if there are not enough replicas in ISR
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curMaximalIsr.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_OR_FOLLOWER)
}
}
如果满足leaderLog.highWatermark >= requiredOffset,则complete这次的DelayOperation,取消timeout任务,调response的callback
override def onComplete(): Unit = {
val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus)
}
上面介绍的tryComplete成功的情况,如果此时Leader的高水位并没有达到requiredOffset,那么就会将topicPartition加入到Watchers中,那么之后watchers就会等副本的fetch完成来进行更新:
Broker Server HandleProducerRequest 的diagram如下: