KafkaProducer(ProducerConfig config,  
  Serializer<K> keySerializer,  
  Serializer<V> valueSerializer,  
  ProducerMetadata metadata,  
  KafkaClient kafkaClient,  
  ProducerInterceptors<K, V> interceptors,  
  Time time) {  
  try {  
  this.partitioner = config.getConfiguredInstance(...);  
  this.keySerializer = config.getConfiguredInstance(...);  
  this.valueSerializer = config.getConfiguredInstance(...);  
  List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.createConfiguredInterceptors(...)  
  this.interceptors = new ProducerInterceptors<>(interceptorList);  
  this.accumulator = new RecordAccumulator(...);  
  this.metadata = new ProducerMetadata(...}  
  this.sender = newSender(logContext, kafkaClient, this.metadata);  
  this.ioThread = new KafkaThread(ioThreadName, this.sender, true);  


private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          // Append callback takes care of the following:
          // - call interceptors and user callback on completion
          // - remember partition that is calculated in RecordAccumulator.append
          AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);

          try {
              // first make sure the metadata for the topic is available
              long nowMs = time.milliseconds();
              ClusterAndWaitTime clusterAndWaitTime;
              try {
                  clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
              } catch (KafkaException e) {
              nowMs += clusterAndWaitTime.waitedOnMetadataMs;
              long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
              Cluster cluster = clusterAndWaitTime.cluster;
              byte[] serializedKey;
              try {
                  serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
              } catch (ClassCastException cce) {
              byte[] serializedValue;
              try {
                  serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
              } catch (ClassCastException cce) {

              // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
              // which means that the RecordAccumulator would pick a partition using built-in logic (which may
              // take into account broker load, the amount of data produced to each partition, etc.).
              int partition = partition(record, serializedKey, serializedValue, cluster);
              // Append the record to the accumulator. Note, that the actual partition may be
              // calculated there and can be accessed via appendCallbacks.topicPartition.
              RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                      serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
              assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

              if (result.batchIsFull || result.newBatchCreated) {
                  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
              return result.future;
              // handling exceptions and record the errors;
              // for API exceptions return them in the future,
              // for other exceptions throw directly
  • MetaData


  在KafkaProducer中用来维护MetaData的对象是ProducerMetadata它其中有个properties:Map<String, Long> topics = new HashMap<>();这个topicsMap,并且封装了MetaData对象它其中包含了

- MetadataCache cache
	- Map<Integer, Node> nodes;
	- Node controller;
	- Map<TopicPartition, PartitionMetadata> metadataByPartition;
	- Cluster clusterInstance;
- Long metadataExpireMs


  • 其中封装了MetaData对象它其中包含了

    - MetadataCache cache
    	- Map<Integer, Node> nodes;
    	- Node controller;
    	- Map<TopicPartition, PartitionMetadata> metadataByPartition;
    	// immutable clone for client read, from metadataByPartition
    	- Cluster clusterInstance;
    // interval time of flush
    - long metadataExpireMs;
    - long lastRefreshMs;  
    - long lastSuccessfulRefreshMs;


     - TopicPartition topicPartition;
     - Optional<Integer> leaderId
     - Optional<Integer> leaderEpoch;
     - List<Integer> replicaIds;
     - List<Integer> inSyncReplicaIds;  
     - List<Integer> offlineReplicaIds;


    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
              // add topic to metadata topic list if it is not there already and reset expiry  
              Cluster cluster = metadata.fetch();
              if (cluster.invalidTopics().contains(topic))
                  throw new InvalidTopicException(topic);
              metadata.add(topic, nowMs);
              Integer partitionsCount =cluster.partitionCountForTopic(topic);
              // Return cached metadata if we have it, and if the record's partition is either undefined  
              // or within the known partition range  
              if (partitionsCount != null && (partition == null || partition < partitionsCount))
                  return new ClusterAndWaitTime(cluster, 0);
              long remainingWaitMs = maxWaitMs;
              long elapsed = 0;
              // Issue metadata requests until we have metadata for the topic and the requested partition,  
              // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata  
              // is stale and the number of partitions for this topic has increased in the meantime.  
              long nowNanos = time.nanoseconds();
              do {
                  if (partition != null) {
                      log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
                  } else {
                      log.trace("Requesting metadata update for topic {}.", topic);
                  metadata.add(topic, nowMs + elapsed);
                  int version = metadata.requestUpdateForTopic(topic);
                  try {
                      metadata.awaitUpdate(version, remainingWaitMs);
                  } catch (TimeoutException ex) {
                      // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs  
                  cluster = metadata.fetch();
                  elapsed = time.milliseconds() - nowMs;
                  if (elapsed >= maxWaitMs) {
                  remainingWaitMs = maxWaitMs - elapsed;
                  partitionsCount = cluster.partitionCountForTopic(topic);
              } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
              producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
              return new ClusterAndWaitTime(cluster, elapsed);
    1. 先从metadata.fetch()中获取PartitionInfo的cache
    2. 如果是新Topic则进行requestUpdateForNewTopics
    3. 更新needPartialUpdate = true
    4. 将this.requestVersion++
    5. 更新topic下一次刷新时间
    6. topics.put(topic, nowMs + metadataIdleMs)
    7. 获取topicPartition的总数
    8. do循环中阻塞等待获取最新的PartitionData
    9. 唤起sender线程去retrieveMetaDataInfo
    10. metadata.awaitUpdate(version, remainingWaitMs)阻塞等待
    11. 超时报错否则直到获取到partition的metaData
  • recordAccumulator

  在send方法中发送消息时并不是直接将消息发送到Broker而是暂存在RecordAccumulator的ConcurrentMap<String, TopicInfo> topicInfoMapproducer会不断的缓存消息到TopicInfo的ProducerBatch队列中之后sender线程会不断的check批量发送符合条件的records以下是append方法

    public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {

      TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

      // We keep track of the number of appending thread to make sure we do not miss batches in
      // abortIncompleteBatches().
      ByteBuffer buffer = null;
      if (headers == null) headers = Record.EMPTY_HEADERS;
      try {
          // Loop to retry in case we encounter partitioner's race conditions.
          while (true) {
              // If the message doesn't have any partition affinity, so we pick a partition based on the broker
              // availability and performance. Note, that here we peek current partition before we hold the
              // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
              // deque lock.
              final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
              final int effectivePartition;
              if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                  partitionInfo = topicInfo.builtInPartitioner
                  effectivePartition = partitionInfo.partition();
              } else {
                  partitionInfo = null;
                  effectivePartition = partition;

              // Now that we know the effective partition, let the caller know.

              setPartition(callbacks, effectivePartition);

              // check if we have an in-progress batch
              Deque<ProducerBatch> dq = topicInfo.batches
                      .computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());

              synchronized (dq) {
                  // After taking the lock, validate that the partition hasn't changed and retry.
                  if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))

                  RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);

                  if (appendResult != null) {
                      // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                      boolean enableSwitch = allBatchesFull(dq);

                              partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);

                      return appendResult;

              // we don't have an in-progress record batch try to allocate a new batch
              if (abortOnNewBatch) {
                  // Return a result that will cause another call to append.
                  return new RecordAppendResult(null, false, false, true, 0);


              if (buffer == null) {
                  byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                  int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                  log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
                  // This call may block if we exhausted buffer space.
                  buffer = free.allocate(size, maxTimeToBlock);
                  // Update the current time in case the buffer allocation blocked above.
                  // NOTE: getting time may be expensive, so calling it under a lock
                  // should be avoided.
                  nowMs = time.milliseconds();

              synchronized (dq) {
                  // After taking the lock, validate that the partition hasn't changed and retry.
                  if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))

                  RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                  // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                  if (appendResult.newBatchCreated)
                      buffer = null;
                  // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                  boolean enableSwitch = allBatchesFull(dq);
                  topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                  return appendResult;
      } finally {


  - ConcurrentMap<Integer, Deque<ProducerBatch>> batches
  - BuiltInPartitioner builtInPartitioner



  1. 首先在partition with batches 的Map中找到对应的Deque <ProducerBatch>,否则创建新的Deque
  2. 对Deque加锁调用tryAppend尝试向ProducerBatch中插入一条record释放锁
  3. 如果append成功则直接返回否则会close这个已经满的队列
  4. 然后申请ByteBuffer
  5. 再次对Deque加锁分配一个新的ProducerBatch将record添加进去并将ProducerBatch append到deque的尾部
  • 这里在两次append方法中分别都对deque进行加锁因为这个Deque是非线程安全的为什么要分成两次小粒
    度的加锁呢并且为什么使用队列并且在队列内部的元素也是一个List of Records:
  • 考虑现在有两个线程都会send reuqest假设只有一次大粒度加锁线程1发送的消息比较大需要向
    BufferPool申请新空间而此时BufferPool空间不足线程1在BufferPool上等待此时它依然持有对应Deque的锁线程2发送的消息较小Deque最后一个ProducerBatch剩余空间够用但是由于线程1未释放Deque的锁所以需要一起等待若有多个小msg的线程存在那么就会造成为了等待IO而阻塞其他请求从而降低了吞吐量而现在kafka的设计是在tryAppend和最后new append中占有锁在中间的allocate buffer中是不占有锁的一个很好的设计
  • 并且第二次加锁后重试也防止了多个线程并发向BufferPool申请空间后造成内部碎片


  • Sender


          boolean sendable = full
                || expired
                || exhausted
                || closed
                || flushInProgress()
                || transactionCompleting;
        if (sendable && !backingOff) 
  1. Deque中有多个ProducerBatch或是第一个ProducerBatch已经满了
  2. 是否超时
  3. 是否有线程正在等待flush操作完成
  4. Sender线程准备关闭

sendProducerData 就会将read的数据发送给Broker

private long sendProducerData(long now) {
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadata, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                // Update just the readyTimeMs of the latency stats, so that it moves forward
                // every time the batch is ready (then the difference between readyTimeMs and
                // drainTimeMs would represent how long data is waiting for the node).
                this.accumulator.updateNodeLatencyStats(node.id(), now, false);
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            } else {
                // Update both readyTimeMs and drainTimeMs, this would "reset" the node
                // latency.
                this.accumulator.updateNodeLatencyStats(node.id(), now, true);

        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)

        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                    + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        sendProduceRequests(batches, now);
        return pollTimeout;

