

  每个Partition都有两个管理对象ReplicaManager 和 LogMangerReplicaManager的作用是管理这台 broker 上的所有副本replica在 Kafka 中每个副本replica都会跟日志实例Log 对象一一对应一个副本会对应一个 Log 对象

管理对象 组成部分
日志管理器LogManager 日志Log 日志分段LogSegment
副本管理器ReplicaManager 分区Partition 副本Replica

  生产者的ack是可以配置的当acks=0时生产者发送数据后就不会等待服务器的响应当acks=1时生产者只需要等待Leader Partition的响应当acks=all时就会等待配置的isr集合中所有副本的返回从而通过利用acks=all和isr动态集合来确保数据一致性问题当Leader节点失效后由于保证了所有ISR集合中所有副本的log end offset都是一致的从而可以从ISR集合中选举新的Leader


case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)


	timeout = produceRequest.timeout.toLong,  
	requiredAcks = produceRequest.acks,  
	internalTopicsAllowed = internalTopicsAllowed,  
	origin = AppendOrigin.CLIENT,  
	entriesPerPartition = authorizedRequestInfo,  
	requestLocal = requestLocal,  
	responseCallback = sendResponseCallback,  
	recordConversionStatsCallback = processingStatsCallback,  
	transactionalId = produceRequest.transactionalId(),  
	transactionStatePartition = transactionStatePartition)
  1. appendEntries:
    • appendLogResults
      • appendToLocalLog
        • appendRecordsToLeader


def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
        requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
        val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
        // leaderLogIfLocal 这个变量是在 LeaderAndIsr 的构造函数中初始化的  
        leaderLogIfLocal match {
        case Some(leaderLog) =>
        val minIsr = leaderLog.config.minInSyncReplicas
        val inSyncSize = partitionState.isr.size

        // Avoid writing to leader if there are not enough insync replicas to make it safe  
        if (inSyncSize < minIsr && requiredAcks == -1) {
        throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
        s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")

        val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
        interBrokerProtocolVersion, requestLocal, verificationGuard)

        // we may need to increment high watermark since ISR could be down to 1  
        (info, maybeIncrementLeaderHW(leaderLog))

        case None =>
        throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
        .format(topicPartition, localBrokerId))

        info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
  1. delayedProduceRequestRequired

  检查requiredAcks,如果是-1则要等Isr集合中所有的副本都同步完成这次的request才算是发送成功构建DelayedProduce的request然后调用delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)请求异步的等待leader的其他副本同步完成

val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)  
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

  其中delayedProduce封装的是每个TopicPartition对应的写入Leader副本的lastOffset + 1的记录其他还有responseCallback

def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
  appendResult.map { case (topicPartition, result) =>
    topicPartition -> ProducePartitionStatus(
      result.info.lastOffset + 1, // required offset  
      new PartitionResponse(
        if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
    ) // response status  




delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)


if (operation.safeTryCompleteOrElse {
  watchKeys.foreach(key => watchForOperation(key, operation))
  if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true

  在调用safeTryCompleteOrElse时会先抢占锁这里的lock是自旋锁lockOpt.getOrElse(new ReentrantLock)

def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {  
	if (tryComplete()) true  
	else {  
		// last completion check  


* The delayed produce operation can be completed if every partition  
* it produces to is satisfied by one of the following:  
* Case A: Replica not assigned to partition  
* Case B: Replica is no longer the leader of this partition  
* Case C: This broker is the leader:  
* C.1 - If there was a local error thrown while checking if at least requiredAcks  
* replicas have caught up to this operation: set an error in response  
* C.2 - Otherwise, set the response with no error.  


def tryComplete(): Boolean = {  
// check for each partition if it still has pending acks  
	produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>  

// skip those partitions that have already been satisfied  
	if (status.acksPending) {  
		val (hasEnough, error) = 
			replicaManager.getPartitionOrError(topicPartition) match {  
		case Left(err) =>  
			// Case A  
			(false, err)  
		case Right(partition) =>  
		// Case B || C.1 || C.2  
		if (error != Errors.NONE || hasEnough) {  
			status.acksPending = false  
			status.responseStatus.error = error  
	// check if every partition has satisfied 
	//at least one of case A, B or C  
	if (!produceMetadata.produceStatus.values.exists(_.acksPending))


  def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
    getPartition(topicPartition) match {
      case HostedPartition.Online(partition) =>

      case HostedPartition.Offline =>

      case HostedPartition.None if metadataCache.contains(topicPartition) =>
        // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
        // forces clients to refresh metadata to find the new location. This can happen, for example,
        // during a partition reassignment if a produce request from the client is sent to a broker after
        // the local replica has been deleted.

      case HostedPartition.None =>

  checkEnoughReplicasReachOffset这个方法比较重要主要是了检查ISR集合的offset是否已经到了Leader这次写入请求的offset,主要的check条件是leaderLog.highWatermark >= requiredOffset

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
  leaderLogIfLocal match {
    case Some(leaderLog) =>
      // keep the current immutable replica list reference  
      val curMaximalIsr = partitionState.maximalIsr

      if (isTraceEnabled) {
        def logEndOffsetString: ((Int, Long)) => String = {
          case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"

        val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
        val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.stateSnapshot.logEndOffset))
        val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
        val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition {
          _._2 >= requiredOffset

        trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
          s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
          s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")

      val minIsr = leaderLog.config.minInSyncReplicas
      if (leaderLog.highWatermark >= requiredOffset) {
        * The topic may be configured not to accept messages if there are not enough replicas in ISR  
        * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk  
        if (minIsr <= curMaximalIsr.size)
          (true, Errors.NONE)
      } else
        (false, Errors.NONE)
    case None =>
      (false, Errors.NOT_LEADER_OR_FOLLOWER)

  如果满足leaderLog.highWatermark >= requiredOffset则complete这次的DelayOperation取消timeout任务调response的callback

override def onComplete(): Unit = {
  val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }


Broker Server HandleProducerRequest 的diagram如下