Kafka-Producer

  如今在维护各个微服务下考虑到最多的就是分布式系统的可用性–CAP那么就像数据密集型服务描述的那样需要考虑分布式系统下数据复制数据分区分布式事务本节通过看kafka的多分区复制方式来学习数据复制中需要考虑到的通用处理

  • Producer

  生产者客户端通过调用send方法,将消息发送到Topic所在的Broker之后供消费者来进行消费先给整体Flow的digaram

  KafkaProducer构造函数中初始化serializermetaDataaccumulatorsender

KafkaProducer(ProducerConfig config,  
  Serializer<K> keySerializer,  
  Serializer<V> valueSerializer,  
  ProducerMetadata metadata,  
  KafkaClient kafkaClient,  
  ProducerInterceptors<K, V> interceptors,  
  Time time) {  
  try {  
  
  this.partitioner = config.getConfiguredInstance(...);  
  
  this.keySerializer = config.getConfiguredInstance(...);  
  
  this.valueSerializer = config.getConfiguredInstance(...);  
  
  List<ProducerInterceptor<K, V>> interceptorList = ClientUtils.createConfiguredInterceptors(...)  
  
  this.interceptors = new ProducerInterceptors<>(interceptorList);  
  
  this.accumulator = new RecordAccumulator(...);  
  
  this.metadata = new ProducerMetadata(...}  
  
  this.sender = newSender(logContext, kafkaClient, this.metadata);  
  
  this.ioThread = new KafkaThread(ioThreadName, this.sender, true);  
  this.ioThread.start();  
  
  }

  我们来看看KafkaProducer的send(record,callback)方法的整体调用流程

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
          // Append callback takes care of the following:
          // - call interceptors and user callback on completion
          // - remember partition that is calculated in RecordAccumulator.append
          AppendCallbacks appendCallbacks = new AppendCallbacks(callback, this.interceptors, record);

          try {
              throwIfProducerClosed();
              // first make sure the metadata for the topic is available
              long nowMs = time.milliseconds();
              ClusterAndWaitTime clusterAndWaitTime;
              try {
                  clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
              } catch (KafkaException e) {
                  ...
              }
              nowMs += clusterAndWaitTime.waitedOnMetadataMs;
              long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
              Cluster cluster = clusterAndWaitTime.cluster;
              byte[] serializedKey;
              try {
                  serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
              } catch (ClassCastException cce) {
                  ...
              }
              byte[] serializedValue;
              try {
                  serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
              } catch (ClassCastException cce) {
                  ...
              }

              // Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION,
              // which means that the RecordAccumulator would pick a partition using built-in logic (which may
              // take into account broker load, the amount of data produced to each partition, etc.).
              int partition = partition(record, serializedKey, serializedValue, cluster);
              
              // Append the record to the accumulator. Note, that the actual partition may be
              // calculated there and can be accessed via appendCallbacks.topicPartition.
              RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey,
                      serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster);
              assert appendCallbacks.getPartition() != RecordMetadata.UNKNOWN_PARTITION;

              if (result.batchIsFull || result.newBatchCreated) {
                  log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), appendCallbacks.getPartition());
                  this.sender.wakeup();
              }
              return result.future;
              // handling exceptions and record the errors;
              // for API exceptions return them in the future,
              // for other exceptions throw directly
          }
  }
  • MetaData

  每个Topic中有多个分区这些分区的Leader副本可以分配在集群中不同的Broker上而这些Leader副本因为服务网络等问题会动态变化需要Producer维护各分区Leader副本的信息(包括Leader所在服务器的网络地址分区号等),并且及时进行更新当Producer发送消息到Topic中根据MetaData中保存的最新各分区Leader副本来选择其中的patition来发送数据

  在KafkaProducer中用来维护MetaData的对象是ProducerMetadata它其中有个properties:Map<String, Long> topics = new HashMap<>();这个topicsMap,并且封装了MetaData对象它其中包含了

MetaData
- MetadataCache cache
	- Map<Integer, Node> nodes;
	- Node controller;
	- Map<TopicPartition, PartitionMetadata> metadataByPartition;
	- Cluster clusterInstance;
- Long metadataExpireMs

PartitionMetadata包含Leader副本的详细信息

  • 其中封装了MetaData对象它其中包含了

    MetaData
    - MetadataCache cache
    	- Map<Integer, Node> nodes;
    	- Node controller;
    	- Map<TopicPartition, PartitionMetadata> metadataByPartition;
    	// immutable clone for client read, from metadataByPartition
    	- Cluster clusterInstance;
    // interval time of flush
    - long metadataExpireMs;
    - long lastRefreshMs;  
    - long lastSuccessfulRefreshMs;
    

    其中PartitionMetadata包含的Leader副本的详细信息

    PartitionMetadata
     - TopicPartition topicPartition;
     - Optional<Integer> leaderId
     - Optional<Integer> leaderEpoch;
     - List<Integer> replicaIds;
     - List<Integer> inSyncReplicaIds;  
     - List<Integer> offlineReplicaIds;
    

    MetaData的更新策略

    private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
              // add topic to metadata topic list if it is not there already and reset expiry  
              Cluster cluster = metadata.fetch();
    
              if (cluster.invalidTopics().contains(topic))
                  throw new InvalidTopicException(topic);
    
              metadata.add(topic, nowMs);
    
              Integer partitionsCount =cluster.partitionCountForTopic(topic);
              // Return cached metadata if we have it, and if the record's partition is either undefined  
              // or within the known partition range  
              if (partitionsCount != null && (partition == null || partition < partitionsCount))
                  return new ClusterAndWaitTime(cluster, 0);
    
              long remainingWaitMs = maxWaitMs;
              long elapsed = 0;
              // Issue metadata requests until we have metadata for the topic and the requested partition,  
              // or until maxWaitTimeMs is exceeded. This is necessary in case the metadata  
              // is stale and the number of partitions for this topic has increased in the meantime.  
              long nowNanos = time.nanoseconds();
              do {
                  if (partition != null) {
                      log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
                  } else {
                      log.trace("Requesting metadata update for topic {}.", topic);
                  }
                  metadata.add(topic, nowMs + elapsed);
                  int version = metadata.requestUpdateForTopic(topic);
                  sender.wakeup();
                  try {
                      metadata.awaitUpdate(version, remainingWaitMs);
                  } catch (TimeoutException ex) {
                      // Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs  
                      ...
                  }
    
                  cluster = metadata.fetch();
                  elapsed = time.milliseconds() - nowMs;
                  if (elapsed >= maxWaitMs) {
                      ...
                  }
    
                  metadata.maybeThrowExceptionForTopic(topic);
                  remainingWaitMs = maxWaitMs - elapsed;
                  partitionsCount = cluster.partitionCountForTopic(topic);
              } while (partitionsCount == null || (partition != null && partition >= partitionsCount));
    
              producerMetrics.recordMetadataWait(time.nanoseconds() - nowNanos);
    
              return new ClusterAndWaitTime(cluster, elapsed);
          }
    
    1. 先从metadata.fetch()中获取PartitionInfo的cache
    2. 如果是新Topic则进行requestUpdateForNewTopics
    3. 更新needPartialUpdate = true
    4. 将this.requestVersion++
    5. 更新topic下一次刷新时间
    6. topics.put(topic, nowMs + metadataIdleMs)
    7. 获取topicPartition的总数
    8. do循环中阻塞等待获取最新的PartitionData
    9. 唤起sender线程去retrieveMetaDataInfo
    10. metadata.awaitUpdate(version, remainingWaitMs)阻塞等待
    11. 超时报错否则直到获取到partition的metaData
  • recordAccumulator

  在send方法中发送消息时并不是直接将消息发送到Broker而是暂存在RecordAccumulator的ConcurrentMap<String, TopicInfo> topicInfoMapproducer会不断的缓存消息到TopicInfo的ProducerBatch队列中之后sender线程会不断的check批量发送符合条件的records以下是append方法

    public RecordAppendResult append(String topic,
                                     int partition,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     AppendCallbacks callbacks,
                                     long maxTimeToBlock,
                                     boolean abortOnNewBatch,
                                     long nowMs,
                                     Cluster cluster) throws InterruptedException {

      TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, k -> new TopicInfo(logContext, k, batchSize));

      // We keep track of the number of appending thread to make sure we do not miss batches in
      // abortIncompleteBatches().
      appendsInProgress.incrementAndGet();
      ByteBuffer buffer = null;
      if (headers == null) headers = Record.EMPTY_HEADERS;
      try {
          // Loop to retry in case we encounter partitioner's race conditions.
          while (true) {
              // If the message doesn't have any partition affinity, so we pick a partition based on the broker
              // availability and performance. Note, that here we peek current partition before we hold the
              // deque lock, so we'll need to make sure that it's not changed while we were waiting for the
              // deque lock.
              final BuiltInPartitioner.StickyPartitionInfo partitionInfo;
              final int effectivePartition;
              if (partition == RecordMetadata.UNKNOWN_PARTITION) {
                  partitionInfo = topicInfo.builtInPartitioner
                          .peekCurrentPartitionInfo(cluster);
                  effectivePartition = partitionInfo.partition();
              } else {
                  partitionInfo = null;
                  effectivePartition = partition;
              }

              // Now that we know the effective partition, let the caller know.

              setPartition(callbacks, effectivePartition);

              // check if we have an in-progress batch
              Deque<ProducerBatch> dq = topicInfo.batches
                      .computeIfAbsent(effectivePartition, k -> new ArrayDeque<>());

              synchronized (dq) {
                  // After taking the lock, validate that the partition hasn't changed and retry.
                  if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                      continue;

                  RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs);

                  if (appendResult != null) {
                      // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                      boolean enableSwitch = allBatchesFull(dq);

                      topicInfo.builtInPartitioner.updatePartitionInfo(
                              partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);

                      return appendResult;
                  }
              }

              // we don't have an in-progress record batch try to allocate a new batch
              if (abortOnNewBatch) {
                  // Return a result that will cause another call to append.
                  return new RecordAppendResult(null, false, false, true, 0);

              }

              if (buffer == null) {
                  byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
                  int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
                  log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, topic, partition, maxTimeToBlock);
                  // This call may block if we exhausted buffer space.
                  buffer = free.allocate(size, maxTimeToBlock);
                  // Update the current time in case the buffer allocation blocked above.
                  // NOTE: getting time may be expensive, so calling it under a lock
                  // should be avoided.
                  nowMs = time.milliseconds();
              }

              synchronized (dq) {
                  // After taking the lock, validate that the partition hasn't changed and retry.
                  if (partitionChanged(topic, topicInfo, partitionInfo, dq, nowMs, cluster))
                      continue;

                  RecordAppendResult appendResult = appendNewBatch(topic, effectivePartition, dq, timestamp, key, value, headers, callbacks, buffer, nowMs);
                  // Set buffer to null, so that deallocate doesn't return it back to free pool, since it's used in the batch.
                  if (appendResult.newBatchCreated)
                      buffer = null;
                  // If queue has incomplete batches we disable switch (see comments in updatePartitionInfo).
                  boolean enableSwitch = allBatchesFull(dq);
                  topicInfo.builtInPartitioner.updatePartitionInfo(partitionInfo, appendResult.appendedBytes, cluster, enableSwitch);
                  return appendResult;
              }
          }
      } finally {
          free.deallocate(buffer);
          appendsInProgress.decrementAndGet();
      }
  }

TopicInfo其中每个partition的队列含有需要发送的records的ProducerBatch

TopicInfo
  - ConcurrentMap<Integer, Deque<ProducerBatch>> batches
  - BuiltInPartitioner builtInPartitioner

大体实现方式如下本文的学习内容暂时不包括底层的ByteBuffer是如何分配的append底层的对象是如何维护的只了解大概的实现方式

RecordAccumulator的append方法主要逻辑

  1. 首先在partition with batches 的Map中找到对应的Deque <ProducerBatch>,否则创建新的Deque
  2. 对Deque加锁调用tryAppend尝试向ProducerBatch中插入一条record释放锁
  3. 如果append成功则直接返回否则会close这个已经满的队列
  4. 然后申请ByteBuffer
  5. 再次对Deque加锁分配一个新的ProducerBatch将record添加进去并将ProducerBatch append到deque的尾部
  • 这里在两次append方法中分别都对deque进行加锁因为这个Deque是非线程安全的为什么要分成两次小粒
    度的加锁呢并且为什么使用队列并且在队列内部的元素也是一个List of Records:
  • 考虑现在有两个线程都会send reuqest假设只有一次大粒度加锁线程1发送的消息比较大需要向
    BufferPool申请新空间而此时BufferPool空间不足线程1在BufferPool上等待此时它依然持有对应Deque的锁线程2发送的消息较小Deque最后一个ProducerBatch剩余空间够用但是由于线程1未释放Deque的锁所以需要一起等待若有多个小msg的线程存在那么就会造成为了等待IO而阻塞其他请求从而降低了吞吐量而现在kafka的设计是在tryAppend和最后new append中占有锁在中间的allocate buffer中是不占有锁的一个很好的设计
  • 并且第二次加锁后重试也防止了多个线程并发向BufferPool申请空间后造成内部碎片

  之后是试着唤醒sender线程来发送客户端消息到Broker可以看到客户端的send请求实际上
并没有将消息直接发送到Broker而是先append到ProducerBatch中之后统一通过sender线程将消息批量的发送到Broker从而增加吞吐量

  • Sender

  在客户端将消息发送给服务端之前会调用RecordAccumulator.ready()方法获取集群中符合发送消息条件的节点集合:

          boolean sendable = full
                || expired
                || exhausted
                || closed
                || flushInProgress()
                || transactionCompleting;
        if (sendable && !backingOff) 
            readyNodes.add(leader);
  1. Deque中有多个ProducerBatch或是第一个ProducerBatch已经满了
  2. 是否超时
  3. 是否有线程正在等待flush操作完成
  4. Sender线程准备关闭

sendProducerData 就会将read的数据发送给Broker

private long sendProducerData(long now) {
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(metadata, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic, now);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                    result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                // Update just the readyTimeMs of the latency stats, so that it moves forward
                // every time the batch is ready (then the difference between readyTimeMs and
                // drainTimeMs would represent how long data is waiting for the node).
                this.accumulator.updateNodeLatencyStats(node.id(), now, false);
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            } else {
                // Update both readyTimeMs and drainTimeMs, this would "reset" the node
                // latency.
                this.accumulator.updateNodeLatencyStats(node.id(), now, true);
            }
        }

        // create produce requests
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(metadata, result.readyNodes, this.maxRequestSize, now);
        addToInflightBatches(batches);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);

        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetIdempotentProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                    + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            failBatch(expiredBatch, new TimeoutException(errorMessage), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }
        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout will be the smaller value between next batch expiry
        // time, and the delay time for checking data availability. Note that the nodes may have data that isn't yet
        // sendable due to lingering, backing off, etc. This specifically does not include nodes with sendable data
        // that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        sendProduceRequests(batches, now);
        return pollTimeout;
    }

  之后会检查readyNodes将过期的records移除掉之后client发送请求到Broker当client收到response后会调用callback

  这种分多个Batch并且全部通过callback来完成异步response的方式可以避免一个大的消息阻塞线程同时多个batch发送来提高吞吐量这种方式可以考虑到RPC客户端在发送多个消息时来避免大消息阻塞线程