Kafka-replica-server

  本文简单介绍当Broker接受到producerRequest的时候,ReplicaManager是如何处理的本地写入和WaitForFollowerFetch

  每个Partition都有两个管理对象ReplicaManager 和 LogManger,ReplicaManager的作用是管理这台 broker 上的所有副本(replica)。在 Kafka 中,每个副本(replica)都会跟日志实例(Log 对象)一一对应,一个副本会对应一个 Log 对象。

管理对象 组成部分
日志管理器(LogManager) 日志(Log) 日志分段(LogSegment)
副本管理器(ReplicaManager) 分区(Partition) 副本(Replica)

  生产者的ack是可以配置的,当acks=0时,生产者发送数据后,就不会等待服务器的响应,当acks=1时,生产者只需要等待Leader Partition的响应,当acks=all时,就会等待配置的isr集合中所有副本的返回,从而通过利用acks=all和isr动态集合来确保数据一致性问题:当Leader节点失效后,由于保证了所有ISR集合中所有副本的log end offset都是一致的,从而可以从ISR集合中选举新的Leader。

  KafkaApis是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler传递过来的请求分发到不同的handle*()处理方法中,这一节主要是关注producer的请求:

1
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)

  handleProduceRequest中前面是验证request,定义sendResponseCallback,processingStatsCallback,其主要是为了记录metrics,之后就是主要的replicaManager.appendRecords:

1
2
3
4
5
6
7
8
9
10
11
replicaManager.appendRecords(  
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.CLIENT,
entriesPerPartition = authorizedRequestInfo,
requestLocal = requestLocal,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback,
transactionalId = produceRequest.transactionalId(),
transactionStatePartition = transactionStatePartition)
  1. appendEntries:
    • appendLogResults
      • appendToLocalLog
        • appendRecordsToLeader

  先对leaderIsrUpdateLock用读锁,避免这个时候更新Isr集合,造成副本Replica同步与Isr集合不一致的问题,每个Partition中都有一个log的对象,因为是appendToLeader,需要先判断log是不是LeaderLog,然后check此时partitionState的isr.size>=leaderLog.config.minInSyncReplicas,如果满足就appendAsLeader,具体log中是如何写进本地磁盘的先不深入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
// leaderLogIfLocal 这个变量是在 LeaderAndIsr 的构造函数中初始化的
leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = leaderLog.config.minInSyncReplicas
val inSyncSize = partitionState.isr.size

// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}

val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal, verificationGuard)

// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))

case None =>
throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}

info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
}
  1. delayedProduceRequestRequired

  检查requiredAcks,如果是-1,则要等Isr集合中所有的副本都同步完成,这次的request才算是发送成功。构建DelayedProduce的request,然后调用delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)请求,异步的等待leader的其他副本同步完成。

1
2
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)  
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

  其中delayedProduce封装的是每个TopicPartition对应的写入Leader副本的lastOffset + 1的记录,其他还有responseCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
appendResult.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset.map[Long](_.messageOffset)
.orElse(-1L),
result.info.lastOffset,
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors,
if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
)
) // response status
}
}

  producerRequestKeys指的是如果其他副本还没有同步完成,就将这些topPartition加入到Watchers中,等异步完成的时候再移除。

DelayedOperation

  DelayedOperationPurgatory是用来管理延迟操作,通过时间轮来完成定时任务或者异步任务。DelayedOperation有四个实现类,分别表示四类不同的延迟操作,也对应了四种不同的请求,我们先从replicaManager调用的地方开始:

1
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

  tryCompleteElseWatch如其名字所示,先check有没有complete,否则将topPartition加入到Watchers中;

1
2
3
4
if (operation.safeTryCompleteOrElse {
watchKeys.foreach(key => watchForOperation(key, operation))
if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true

  在调用safeTryCompleteOrElse时,会先抢占锁,这里的lock是自旋锁lockOpt.getOrElse(new ReentrantLock)

1
2
3
4
5
6
7
8
def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {  
if (tryComplete()) true
else {
f
// last completion check
tryComplete()
}
}

tryComplete()满足的条件在源码的注释中有写道:

1
2
3
4
5
6
7
8
9
10
11
/**  
* The delayed produce operation can be completed if every partition
* it produces to is satisfied by one of the following:
*
* Case A: Replica not assigned to partition
* Case B: Replica is no longer the leader of this partition
* Case C: This broker is the leader:
* C.1 - If there was a local error thrown while checking if at least requiredAcks
* replicas have caught up to this operation: set an error in response
* C.2 - Otherwise, set the response with no error.
*/

如果满足其中任何一个条件,就进行forceComplete:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def tryComplete(): Boolean = {  
// check for each partition if it still has pending acks
produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>

// skip those partitions that have already been satisfied
if (status.acksPending) {
val (hasEnough, error) =
replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) =>
// Case A
(false, err)

case Right(partition) =>
partition.checkEnoughReplicasReachOffset(
status.requiredOffset)
}

// Case B || C.1 || C.2
if (error != Errors.NONE || hasEnough) {
status.acksPending = false
status.responseStatus.error = error
}
}
}

// check if every partition has satisfied
//at least one of case A, B or C
if (!produceMetadata.produceStatus.values.exists(_.acksPending))
forceComplete()
else
false
}

getPartitionOrError是为了判断此broker是不是该副本的leader,以及此partition是不是还属于此Topic:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
Right(partition)

case HostedPartition.Offline =>
Left(Errors.KAFKA_STORAGE_ERROR)

case HostedPartition.None if metadataCache.contains(topicPartition) =>
// The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
// forces clients to refresh metadata to find the new location. This can happen, for example,
// during a partition reassignment if a produce request from the client is sent to a broker after
// the local replica has been deleted.
Left(Errors.NOT_LEADER_OR_FOLLOWER)

case HostedPartition.None =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
}
}

  checkEnoughReplicasReachOffset这个方法比较重要,主要是了检查ISR集合的offset是否已经到了Leader这次写入请求的offset,主要的check条件是leaderLog.highWatermark >= requiredOffset

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderLogIfLocal match {
case Some(leaderLog) =>
// keep the current immutable replica list reference
val curMaximalIsr = partitionState.maximalIsr

if (isTraceEnabled) {
def logEndOffsetString: ((Int, Long)) => String = {
case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
}

val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.stateSnapshot.logEndOffset))
val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition {
_._2 >= requiredOffset
}

trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
}

val minIsr = leaderLog.config.minInSyncReplicas
if (leaderLog.highWatermark >= requiredOffset) {
/*
* The topic may be configured not to accept messages if there are not enough replicas in ISR
* in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk
*/
if (minIsr <= curMaximalIsr.size)
(true, Errors.NONE)
else
(true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else
(false, Errors.NONE)
case None =>
(false, Errors.NOT_LEADER_OR_FOLLOWER)
}
}

  如果满足leaderLog.highWatermark >= requiredOffset,则complete这次的DelayOperation,取消timeout任务,调response的callback

1
2
3
4
override def onComplete(): Unit = {
val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(responseStatus)
}

  上面介绍的tryComplete成功的情况,如果此时Leader的高水位并没有达到requiredOffset,那么就会将topicPartition加入到Watchers中,那么之后watchers就会等副本的fetch完成来进行更新:

Broker Server HandleProducerRequest 的diagram如下: