Kafka-replica-server

  本文简单介绍当Broker接受到producerRequest的时候ReplicaManager是如何处理的本地写入和WaitForFollowerFetch

  每个Partition都有两个管理对象ReplicaManager 和 LogMangerReplicaManager的作用是管理这台 broker 上的所有副本replica在 Kafka 中每个副本replica都会跟日志实例Log 对象一一对应一个副本会对应一个 Log 对象

管理对象 组成部分
日志管理器LogManager 日志Log 日志分段LogSegment
副本管理器ReplicaManager 分区Partition 副本Replica

  生产者的ack是可以配置的当acks=0时生产者发送数据后就不会等待服务器的响应当acks=1时生产者只需要等待Leader Partition的响应当acks=all时就会等待配置的isr集合中所有副本的返回从而通过利用acks=all和isr动态集合来确保数据一致性问题当Leader节点失效后由于保证了所有ISR集合中所有副本的log end offset都是一致的从而可以从ISR集合中选举新的Leader

  KafkaApis是Kafka服务器处理请求的入口类,负责将KafkaRequestHandler传递过来的请求分发到不同的handle*()处理方法中这一节主要是关注producer的请求

case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)

  handleProduceRequest中前面是验证request定义sendResponseCallbackprocessingStatsCallback其主要是为了记录metrics之后就是主要的replicaManager.appendRecords:

replicaManager.appendRecords(  
	timeout = produceRequest.timeout.toLong,  
	requiredAcks = produceRequest.acks,  
	internalTopicsAllowed = internalTopicsAllowed,  
	origin = AppendOrigin.CLIENT,  
	entriesPerPartition = authorizedRequestInfo,  
	requestLocal = requestLocal,  
	responseCallback = sendResponseCallback,  
	recordConversionStatsCallback = processingStatsCallback,  
	transactionalId = produceRequest.transactionalId(),  
	transactionStatePartition = transactionStatePartition)
  1. appendEntries:
    • appendLogResults
      • appendToLocalLog
        • appendRecordsToLeader

  先对leaderIsrUpdateLock用读锁避免这个时候更新Isr集合造成副本Replica同步与Isr集合不一致的问题每个Partition中都有一个log的对象因为是appendToLeader需要先判断log是不是LeaderLog然后check此时partitionState的isr.size>=leaderLog.config.minInSyncReplicas如果满足就appendAsLeader具体log中是如何写进本地磁盘的先不深入

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
        requestLocal: RequestLocal, verificationGuard: Object = null): LogAppendInfo = {
        val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
        // leaderLogIfLocal 这个变量是在 LeaderAndIsr 的构造函数中初始化的  
        leaderLogIfLocal match {
        case Some(leaderLog) =>
        val minIsr = leaderLog.config.minInSyncReplicas
        val inSyncSize = partitionState.isr.size

        // Avoid writing to leader if there are not enough insync replicas to make it safe  
        if (inSyncSize < minIsr && requiredAcks == -1) {
        throw new NotEnoughReplicasException(s"The size of the current ISR ${partitionState.isr} " +
        s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
        }

        val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
        interBrokerProtocolVersion, requestLocal, verificationGuard)

        // we may need to increment high watermark since ISR could be down to 1  
        (info, maybeIncrementLeaderHW(leaderLog))

        case None =>
        throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
        .format(topicPartition, localBrokerId))
        }
        }

        info.copy(if (leaderHWIncremented) LeaderHwChange.INCREASED else LeaderHwChange.SAME)
        }
  1. delayedProduceRequestRequired

  检查requiredAcks,如果是-1则要等Isr集合中所有的副本都同步完成这次的request才算是发送成功构建DelayedProduce的request然后调用delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)请求异步的等待leader的其他副本同步完成

val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)  
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

  其中delayedProduce封装的是每个TopicPartition对应的写入Leader副本的lastOffset + 1的记录其他还有responseCallback

def produceStatusResult(appendResult: Map[TopicPartition, LogAppendResult], useCustomMessage: Boolean): Map[TopicPartition, ProducePartitionStatus] = {
  appendResult.map { case (topicPartition, result) =>
    topicPartition -> ProducePartitionStatus(
      result.info.lastOffset + 1, // required offset  
      new PartitionResponse(
        result.error,
        result.info.firstOffset.map[Long](_.messageOffset)
          .orElse(-1L),
        result.info.lastOffset,
        result.info.logAppendTime,
        result.info.logStartOffset,
        result.info.recordErrors,
        if (useCustomMessage) result.exception.get.getMessage else result.info.errorMessage
      )
    ) // response status  
  }
}

  producerRequestKeys指的是如果其他副本还没有同步完成就将这些topPartition加入到Watchers中等异步完成的时候再移除

DelayedOperation

  DelayedOperationPurgatory是用来管理延迟操作通过时间轮来完成定时任务或者异步任务DelayedOperation有四个实现类分别表示四类不同的延迟操作也对应了四种不同的请求我们先从replicaManager调用的地方开始

delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

  tryCompleteElseWatch如其名字所示先check有没有complete否则将topPartition加入到Watchers中

if (operation.safeTryCompleteOrElse {
  watchKeys.foreach(key => watchForOperation(key, operation))
  if (watchKeys.nonEmpty) estimatedTotalOperations.incrementAndGet()
}) return true

  在调用safeTryCompleteOrElse时会先抢占锁这里的lock是自旋锁lockOpt.getOrElse(new ReentrantLock)

def safeTryCompleteOrElse(f: => Unit): Boolean = inLock(lock) {  
	if (tryComplete()) true  
	else {  
		f  
		// last completion check  
		tryComplete()  
	}  
}

tryComplete()满足的条件在源码的注释中有写道

/**  
* The delayed produce operation can be completed if every partition  
* it produces to is satisfied by one of the following:  
*  
* Case A: Replica not assigned to partition  
* Case B: Replica is no longer the leader of this partition  
* Case C: This broker is the leader:  
* C.1 - If there was a local error thrown while checking if at least requiredAcks  
* replicas have caught up to this operation: set an error in response  
* C.2 - Otherwise, set the response with no error.  
*/

如果满足其中任何一个条件就进行forceComplete

def tryComplete(): Boolean = {  
// check for each partition if it still has pending acks  
	produceMetadata.produceStatus.forKeyValue { (topicPartition, status) =>  

// skip those partitions that have already been satisfied  
	if (status.acksPending) {  
		val (hasEnough, error) = 
			replicaManager.getPartitionOrError(topicPartition) match {  
		case Left(err) =>  
			// Case A  
			(false, err)  
  
		case Right(partition) =>  
			partition.checkEnoughReplicasReachOffset(
					status.requiredOffset)  
			}  
  
		// Case B || C.1 || C.2  
		if (error != Errors.NONE || hasEnough) {  
			status.acksPending = false  
			status.responseStatus.error = error  
			}  
		}  
	}  
  
	// check if every partition has satisfied 
	//at least one of case A, B or C  
	if (!produceMetadata.produceStatus.values.exists(_.acksPending))
		forceComplete()  
	else  
		false  
}

getPartitionOrError是为了判断此broker是不是该副本的leader以及此partition是不是还属于此Topic

  def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
    getPartition(topicPartition) match {
      case HostedPartition.Online(partition) =>
        Right(partition)

      case HostedPartition.Offline =>
        Left(Errors.KAFKA_STORAGE_ERROR)

      case HostedPartition.None if metadataCache.contains(topicPartition) =>
        // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
        // forces clients to refresh metadata to find the new location. This can happen, for example,
        // during a partition reassignment if a produce request from the client is sent to a broker after
        // the local replica has been deleted.
        Left(Errors.NOT_LEADER_OR_FOLLOWER)

      case HostedPartition.None =>
        Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
    }
  }

  checkEnoughReplicasReachOffset这个方法比较重要主要是了检查ISR集合的offset是否已经到了Leader这次写入请求的offset,主要的check条件是leaderLog.highWatermark >= requiredOffset

def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
  leaderLogIfLocal match {
    case Some(leaderLog) =>
      // keep the current immutable replica list reference  
      val curMaximalIsr = partitionState.maximalIsr

      if (isTraceEnabled) {
        def logEndOffsetString: ((Int, Long)) => String = {
          case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
        }

        val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
        val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.stateSnapshot.logEndOffset))
        val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
        val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition {
          _._2 >= requiredOffset
        }

        trace(s"Progress awaiting ISR acks for offset $requiredOffset: " +
          s"acked: ${ackedReplicas.map(logEndOffsetString)}, " +
          s"awaiting ${awaitingReplicas.map(logEndOffsetString)}")
      }

      val minIsr = leaderLog.config.minInSyncReplicas
      if (leaderLog.highWatermark >= requiredOffset) {
        /*  
        * The topic may be configured not to accept messages if there are not enough replicas in ISR  
        * in this scenario the request was already appended locally and then added to the purgatory before the ISR was shrunk  
        */
        if (minIsr <= curMaximalIsr.size)
          (true, Errors.NONE)
        else
          (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
      } else
        (false, Errors.NONE)
    case None =>
      (false, Errors.NOT_LEADER_OR_FOLLOWER)
  }
}

  如果满足leaderLog.highWatermark >= requiredOffset则complete这次的DelayOperation取消timeout任务调response的callback

override def onComplete(): Unit = {
  val responseStatus = produceMetadata.produceStatus.map { case (k, status) => k -> status.responseStatus }
  responseCallback(responseStatus)
}

  上面介绍的tryComplete成功的情况如果此时Leader的高水位并没有达到requiredOffset那么就会将topicPartition加入到Watchers中那么之后watchers就会等副本的fetch完成来进行更新

Broker Server HandleProducerRequest 的diagram如下