Kafka源码分析01:生产者整体架构

这节课我提供了视频讲解,在源码讲解的基础上增加了原理解析和架构学习的部分,作者本人的B站个人空间如下,可以找到对应的教学视频(教学视频现在已经更新到了生产者,后面的视频还需时间)

https://www.bilibili.com/video/BV1xY4y1a7LV

我们先看一下生产端的架构图:

架构图

Kafka源码分析01:生产者整体架构

生产者主要分为两个线程,分别为KafkaProducer主线程和Sender线程,这两个线程相互配合运行。在实际的使用中开发者只会涉及到KafkaProducer主线程,Sender线程是生产者自动运行的,使用者不会感知到。

两个线程的分工:

  • KafkaProducer主线程:负责具体确定发送主题,分区,消息内容,消息的序列化,等等,这些数据都是需要工程师来设定的,所以主线程要暴露给工程师。
  • Sender线程:Sender线程的功能主要有:与外部保存网络通信,发送消息等功能。Sender线程先从主线程生成的待发送消息集合中获取消息,然后把消息发送出去,这一切对主线程来说都是无感的。

这样做的好处是工程师无需知道消息是如何发送的,只要负责设置发送消息前的数据就可以了,做到了消息的业务层与发送消息行为的解耦。

大体步骤

  1. KafkaProducer主线程发送前的工作:包括配置主题,消息内容等发送数据。
  2. KafkaProducer主线程调用拦截器。
  3. KafkaProducer主线程将消息序列化。
  4. KafkaProducer主线程获取分区器。
  5. 把消息保存在缓存区(RecordAccumulator)中等待发送,RecordAccumulator的大小可以在客户端设置,参数为buffer.memory,默认是32MB。RecordAccumulator为每个分区创建一个双端队列Deque, 消息会根据要发送的分区保存相应的Deque中,Deque中的元素就是ProducerBatch。同时一个ProducerBatch内会有一个或多个消息对象ProducerRecord。这样做的目的是可以做到批量发送,减少网络传输的成本。
  6. 激活Sender线程:当缓冲中有数据的时候主线程会激活Sender线程。
  7. Sender线程从缓存区拿到消息并构造用于网络请求的消息对象:

1)Sender线程会将原本的保存形式变成,Node表示要发送的服务端节点。对于主线程KafkaProducer而言只要关心消息发送到哪个分区,而对于Sender线程只要关心消息是要发送到哪个broker节点的就可以了,相当于做了一次从业务逻辑到IO逻辑的转换。

2)然后,Sender线程会把进一步转化为,这样就可以发送到各个broker节点了。

  1. Sender线程把请求发送到服务端broker之前还会把消息请求保存在InFlightRequests中,InFlightRequests内部数据的保存形式是Map,目的是保存发送出去了但是还没返回的请求。在客户端还可以配置发送到某个NodeId的长度,也就是Map内Deque中Request对象的个数。参数为max.in.flight.requests.per.connection,默认值为5,超过这个值就不能在发送了。同时我们也可以通过这个值判断客户端与某个broker的连接是否ok。因为如果请求积累的太多了,那么我们就可以严重怀疑与这个broker连接的网络问题或broker的负载瓶颈到了了。
  2. Sender线程向Kafka服务端发送消息,服务端返回响应。
  3. Sender线程确认消息发送成功后,会删除缓冲区中的ProducerBatch。

相关源码分析

好,我们来看一下相关的源码:

KafkaProducer.java的构造方法:

KafkaProducer(Map configs,
              Serializer keySerializer,
              Serializer valueSerializer,
              ProducerMetadata metadata,
              KafkaClient kafkaClient,
              ProducerInterceptors interceptors,
              Time time) {
    ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, keySerializer,
            valueSerializer));
    try {
        //1.用户自定义参数
        Map userProvidedConfigs = config.originals();
        this.producerConfig = config;
        this.time = time;
        String transactionalId = (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
        //2.获取配置参数。
        this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        LogContext logContext;
        if (transactionalId == null)
            logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
        else
            logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));

        log = logContext.logger(KafkaProducer.class);
        log.trace("Starting the Kafka producer");


        Map metricTags = Collections.singletonMap("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                .tags(metricTags);
        List reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class,
                Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
        JmxReporter jmxReporter = new JmxReporter();
        jmxReporter.configure(userProvidedConfigs);
        reporters.add(jmxReporter);
        MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
        config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);

        //3.获取分区器。
        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

        //4.失败重试的退避时间。默认100ms
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);

        //5.定义key和value的序列化器
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,                                                                      Serializer.class);

            this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,                                                                         Serializer.class);
        this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }

        // load interceptors and make sure they get clientId
        userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);

        //6.定义拦截器列表
        List> interceptorList = (List) configWithClientId.getConfiguredInstances(
                ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
        if (interceptors != null)
            this.interceptors = interceptors;
        else
            this.interceptors = new ProducerInterceptors<>(interceptorList);
        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
                valueSerializer, interceptorList, reporters);
                
        //8.最大请求大小。默认1M,这个值有些小,在实际生产环境中经常会比这个参数大,我们一般设置为10M
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        //9.消息缓冲区大小。默认是32M,如果有特殊的需要我们可以修改
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        //10.获取压缩类型
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));

        //11.最大阻塞耗时。默认1分钟。KafkaProducer'调用send(), partitionsFor(), initTransactions(), sendOffsetsToTransaction(), commitTransaction() and abortTransaction()

        //这些方法一共要消耗的时间,除去事务相关的方法,其实就send(), partitionsFor()两个方法,而这两个方法主要是等待元数据更新造成的阻塞时间。
        this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
        int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
        this.apiVersions = new ApiVersions();
        this.transactionManager = configureTransactionState(config, logContext);
        this.accumulator = new RecordAccumulator(logContext,
                config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                this.compressionType,
                lingerMs(config),
                retryBackoffMs,
                deliveryTimeoutMs,
                metrics,
                PRODUCER_METRIC_GROUP_NAME,
                time,
                apiVersions,
                transactionManager,
                new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));


        List addresses = ClientUtils.parseAndValidateAddresses(
                config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
                config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
        if (metadata != null) {
            //12.metadata包含了kafka集群元素信息,主要有:kafka集群的节点有哪些,有哪些topic
            //每个topic有哪些分区,topic的ISR列表,ISR列表分布在哪些节点上,leader partition在哪些节点上。
            //要想获得metadata需要向集群请求获得。
            this.metadata = metadata;
        } else {
            //构建一个空的metadata对象。
            //重试退避时间,默认100MS
            this.metadata = new ProducerMetadata(retryBackoffMs,
                    //元数据过期时间:默认5分钟
                    config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
                    config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
                    logContext,
                    clusterResourceListeners,
                    Time.SYSTEM);

            //启动metadata服务。
            this.metadata.bootstrap(addresses);
        }

        this.errors = this.metrics.sensor("errors");

        //13.创建Sender类的实例。
        this.sender = newSender(logContext, kafkaClient, this.metadata);
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;

        //14.封装和启动sender线程。
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        close(Duration.ofMillis(0), true);
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}
复制代码

KafkaProducer主线程的发送逻辑:

private Future doSend(ProducerRecord record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            //1.等待元数据更新
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        byte[] serializedKey;

        //2.序列化key
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }

        byte[] serializedValue;

        //3.序列化 value
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);

        }

        //4.消息路由到分区。
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);
        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();
        
        //5.根据序列化后消息的大小判断是否超过了规定的大小。
        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
        if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        }

        // 6.把回调方法和拦截器组装成一个对象,
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionManager.failIfNotReadyForSend();
        }

        // 7.把消息加到缓冲区中
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
        //对于不能申请新批次的情况就换分区
        if (result.abortForNewBatch) {
            int prevPartition = partition;
            //换分区
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }

            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }

        if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);
        // 8.唤醒sender线程。
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}
复制代码

Sender线程的发送逻辑:

Sender.java


private long sendProducerData(long now) {
    //1.从缓存中获取元数据
    Cluster cluster = metadata.fetch();

    //2.得到应该发送数据的节点
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);


    //3.如果主题的 leader 分区对应的节点不存在,就要更新元数据
    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic, now);
        log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
            result.unknownLeaderTopics);
        this.metadata.requestUpdate();
    }
    //4.在result返回的node集合的基础上再检查客户端和node。
    Iterator iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        //检查node连接是否可用,并且是否可用往这个节点发送数据
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
        }
    }

    //5.把要发送的消息转换成按节点组织的集合
    Map> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    addToInflightBatches(batches);
    if (guaranteeMessageOrder) {
        for (List batchList : batches.values()) {
            for (ProducerBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    accumulator.resetNextBatchExpiryTime();
    //6.收集和处理过期的batch。
    //Sender自定义inflightBatches集合里过期的batch
    List expiredInflightBatches = getExpiredInflightBatches(now);
    //accumulator定义的batches集合里过期的batch
    List expiredBatches = this.accumulator.expiredBatches(now);
    expiredBatches.addAll(expiredInflightBatches);

    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    //7.处理过期的batch
    for (ProducerBatch expiredBatch : expiredBatches) {
        String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
            + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
        failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            transactionManager.markSequenceUnresolved(expiredBatch);
        }
    }
    sensors.updateProduceRequestMetrics(batches);
    // 7.设定pollTimeout

    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
    pollTimeout = Math.max(pollTimeout, 0);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        pollTimeout = 0;
    }

    //8.发送消息
    sendProduceRequests(batches, now);
    return pollTimeout;
}
复制代码

架构设计知识

  1. 扩展点

为了提升通用框架的扩展性,要提供 过滤器,拦截器的设计。

  1. 序列化

通用框架必须要有序列化设计,因为必须有一个序列化的过程,key,value有各种各样的类型,要发送出去都要转换为byte[]类型。

  1. 路由器的设计

实现某种路由算法的独立组件给用户使用,同时给用户自定义扩展的空间。

  1. 自定义异常处理体系设计

背景

你在写中间件或底层服务的时候,必然会涉及到自定义的异常。在可能出现java异常的地方,try,catch然后抛出自己定义的异常,打印出自定义异常的信息。

同时,系统性的软件需要一个异常体系。由于系统软件是自己设计的,那么根据自己对软件的了解要预先判断出哪里会出问题,针对这些问题定义一些自定义的异常,出了问题后,可以根据出问题的地方抛出的自定义异常判断出软件出了什么问题。

设计思路

  1. 由于我们的系统软件是分层的,那么我们需要把底层出现的自定义异常往上面抛,一直抛到核心流程控制的逻辑里面。
  2. 对异常封装成状态码,返给最上次的调用者,有利于上层方便判断。
  3. 对异常的情况做一些统计。
  4. 在特殊的异常记录类里记录异常的次数等。

本人在掘金发布了小册,对kafka做了源码级的剖析。

欢迎支持笔者小册:《Kafka 源码精讲》


作者:肖恩Sean
链接:https://juejin.cn/post/7109094563649683493
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

发表评论
留言与评论(共有 0 条评论) “”
   
验证码:

相关文章

推荐文章