Kafka ソースコード サーバーの起動プロセスの図解

Kafka ソースコード サーバーの起動プロセスの図解

これまでの「8」回の記事では、「シナリオ主導のアプローチ」を使用して、Kafka「ログシステム」のソースコードアーキテクチャ設計のあらゆる側面を深く分析しました。本日より、Kafka「コントローラー」の基礎となるソースコード実装を詳細に分析します。これはコントローラー シリーズの最初の記事です。戻って、「Kafka サーバーの起動プロセス」についてさらに詳しく説明し、Kafka サーバーがどのように起動されるかを見てみましょう。

I. 全体概要

Kafka の「コントローラー」について詳しく説明する前に、次のような疑問が多少はあると思います。

Kafka サーバーのコンポーネントは何ですか? また、これらのコンポーネントを起動するために使用されるクラスは何ですか?

ここで、Kafka を理解し始めます。ご存知のとおり、次のコマンドを実行することで Kafka を起動できます

 # 1、启动kafka 服务命令: bin/kafka-server-start.sh config/server.properties &

そこで今日は、このスクリプト KafkaServer によってどのコンポーネントが初期化されるかを見てみましょう。

2. kafka-server-start.sh

次のようにシェルの内容を見てみましょう。

 #!/bin/bash # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 1、注释说明该脚本的版权信息和使用许可。 if [ $# -lt 1 ]; then echo "USAGE: $0 [-daemon] server.properties [--override property=value]*" exit 1 fi # 2、检查命令行参数的个数,若小于1 则输出脚本的使用方法并退出。 base_dir=$(dirname $0) # 3、获取当前脚本所在目录的路径,并将其赋值给base_dir 变量。 if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then export KAFKA_LOG4J_OPTS="-Dlog4j.cnotallow=file:$base_dir/../config/log4j.properties" fi # 4、检查KAFKA_LOG4J_OPTS 环境变量是否设置,若未设置则设置该变量的值。 if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" export JMX_PORT="9999" export JMX_RMI_PORT="10000" fi # 5、检查KAFKA_HEAP_OPTS 环境变量是否设置,若未设置则设置该变量的值,并设置JMX_PORT 和JMX_RMI_PORT 环境变量的值,将EXTRA_ARGS 变量的值设置为字符串-name kafkaServer -loggc。 EXTRA_ARGS=${EXTRA_ARGS-'-name kafkaServer -loggc'} # 6、检查命令行参数中COMMAND 变量的值是否为-daemon,若是则将EXTRA_ARGS 变量的值添加-daemon 选项。同时将命令行参数向左移一位,即从$2 开始计算参数。 COMMAND=$1 case $COMMAND in -daemon) EXTRA_ARGS="-daemon "$EXTRA_ARGS shift ;; *) ;; esac # 7、调用$base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中"@ 代表传递的为命令行参数。具体执行的封装在Kafka 客户端库中的kafka.Kafka 类。整个脚本的作用是启动Kafka 服务。 exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@" esac # 7、调用$base_dir/kafka-run-class.sh 脚本并传递相应的参数。其中"@ 代表传递的为命令行参数。具体执行的封装在Kafka 客户端库中的kafka.Kafka 类。整个脚本的作用是启动Kafka 服务。 exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

ここでは、Kafka クライアント ライブラリにカプセル化された kafka.Kafka クラスによって最下層に実装される「ステップ 7」に焦点を当てます。次に、このクラスが何を行うかを見てみましょう。

3. kafka.Kafka クラス

「Kafka.scala」クラスのソースコードは、Kafka ソースコード パッケージのコア パッケージにあります。具体的な github ソースコードの場所は次のとおりです。

https://github.com/apache/kafka/blob/2.8.0/core/src/main/scala/kafka/Kafka.scala。

全体として、このクラスには 3 つのメソッドしかなく、比較的シンプルです。要点を見てみましょう。

ここでは「2.8.x」バージョンを使用して説明します。 「2.7.x」では、KafkaRaftServer クラスはまだ追加されていません。

1. getPropsFromArgs

 def getPropsFromArgs(args: Array[String]): Properties = { // 创建一个命令行参数解析器val optionParser = new OptionParser(false) // 定义--override 选项,用于覆盖server.properties 文件中的属性val overrideOpt = optionParser.accepts("override", "Optional property that should override values set in server.properties file") .withRequiredArg() .ofType(classOf[String]) // 定义--version 选项,用于打印版本信息并退出optionParser.accepts("version", "Print version information and exit.") // 若没有提供参数或者参数包含--help 选项,则打印用法并退出if (args.length == 0 || args.contains("--help")) { CommandLineUtils.printUsageAndDie(optionParser, "USAGE: java [options] %s server.properties [--override property=value]*".format(classOf[KafkaServer].getSimpleName())) } // 若参数中包含--version 选项,则打印版本信息并退出if (args.contains("--version")) { CommandLineUtils.printVersionAndDie() } // 加载server.properties 文件中的属性到Properties 对象中val props = Utils.loadProps(args(0)) // 若提供了其他参数,则解析这些参数if (args.length > 1) { // 解析参数中的选项和参数值val options = optionParser.parse(args.slice(1, args.length): _*) // 检查是否有非选项参数if (options.nonOptionArguments().size() > 0) { CommandLineUtils.printUsageAndDie(optionParser, "Found non argument parameters: " + options.nonOptionArguments().toArray.mkString(",")) } // 将解析得到的选项和参数值添加到props 对象中props ++= CommandLineUtils.parseKeyValueArgs(options.valuesOf(overrideOpt).asScala) } // 返回解析得到的属性集合props }

この関数は、コマンド ライン パラメーターから設定された属性を解析するために使用されます。内部的には OptionParser ライブラリを使用してコマンドライン オプションを解析し、server.properties ファイルからプロパティを読み込みます。

オーバーライド オプションが指定されている場合は、server.properties ファイル内の対応するプロパティがオーバーライドされます。この関数は、解析されたプロパティを含む Properties オブジェクトを返します。

正しいコマンドライン引数が指定されていない場合、または --help または --versionオプションが指定されている場合、関数はヘルプ情報またはバージョン情報を出力して終了します。

2. ビルドサーバー

private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) // 直接启动定时任务、网络层、请求处理层if (config.requiresZookeeper) { new KafkaServer( config, Time.SYSTEM, threadNamePrefix = None, enableForwarding = false ) } else { // 调用BrokerServer 等来启动网络层和请求处理层new KafkaRaftServer( config, Time.SYSTEM, threadNamePrefix = None ) } }

Kafka 2.8.x で raft プロトコルが追加された後、BrokerServer と ControllServer は別々のファイルを使用してネットワーク層とリクエスト処理層を起動しました。 zk メソッドが引き続き使用される場合、KafkaServer はネットワーク層とリクエスト処理層を開始します。

3. メイン

# 2.7.x 版本源码def main(args: Array[String]): Unit = { try { // 1、解析命令行参数,获得属性集合val serverProps = getPropsFromArgs(args) // 2、从属性集合创建KafkaServerStartable 对象val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps) try { // 如果不是Windows 操作系统,并且不是IBM JDK,则注册LoggingSignalHandler if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { // 如果注册LoggingSignalHandler 失败,则在日志中打印警告信息case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // 3、添加shutdown hook,用于在程序结束时执行KafkaServerStartable 的shutdown 方法Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown()) // 4、启动KafkaServerStartable 实例kafkaServerStartable.startup() // 5、等待KafkaServerStartable 实例终止kafkaServerStartable.awaitShutdown() } catch { // 如果有异常发生,则记录日志并退出程序case e: Throwable => fatal("Exiting Kafka due to fatal exception", e) Exit.exit(1) } // 6、正常终止程序Exit.exit(0) }

この関数は、Kafka サービス プロセスのエントリ ポイントであり、Kafka 操作プロセス全体のドライバーです。この関数は、まずコマンドライン引数を解析し、getPropsFromArgs 関数を呼び出してプロパティのコレクションを取得し、次にこれらのプロパティを使用して KafkaServerStartable インスタンスを作成します。次に、プログラム終了時に KafkaServerStartable のシャットダウン メソッドを実行するためのシャットダウン フックを登録します。次に、KafkaServerSタータブル インスタンスを起動し、終了するまで待機します。例外が発生した場合は、それをログに記録してプログラムを終了します。最後に、関数は Exit.exit メソッドを呼び出してプログラムを終了し、正常終了を示す 0 を返します。

 # 2.8.x 版本def main(args: Array[String]): Unit = { // 获取Kafka服务的配置信息val serverProps = getPropsFromArgs(args) // 根据配置信息构建Kafka服务val server = buildServer(serverProps) try { // 注册用于记录日志的信号处理器(若实现失败则退出) if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk) new LoggingSignalHandler().register() } catch { case e: ReflectiveOperationException => warn("Failed to register optional signal handler that logs a message when the process is terminated " + s"by a signal. Reason for registration failure is: $e", e) } // 挂载关闭处理器,用于捕获终止信号和常规终止请求Exit.addShutdownHook("kafka-shutdown-hook", { try server.shutdown() // 关闭Kafka服务catch { case _: Throwable => fatal("Halting Kafka.") // 日志记录致命错误信息// 调用Exit.halt()强制退出,避免重复调用Exit.exit()引发死锁Exit.halt(1) } }) try server.startup() // 启动Kafka服务catch { case _: Throwable => // 调用Exit.exit()设置退出状态码,KafkaServer.startup()会在抛出异常时调用shutdown() fatal("Exiting Kafka.") Exit.exit(1) } server.awaitShutdown() // 等待Kafka服务关闭Exit.exit(0) // 调用Exit.exit()设置退出状态码}

ここで最も重要なのは、「ステップ 4」、つまり kafkaServerStartable.startup() または server.startup() を呼び出して Kafka を起動することです。

ここでは、引き続き「ZK モード」で開始し、時間があるときに「Raft モード」の起動を補足します。

Kafkaサーバー起動可能

「KafkaServerStartable.scala」クラスのソースコードは、Kafka ソースコード パッケージのコア パッケージにあります。具体的な github ソースコードの場所は次のとおりです。

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServerStartable.scala。

Scala では、ソース コード ファイル内で同じ名前のクラスとオブジェクトを定義することをコンパニオンと呼びます。 Class オブジェクトはコンパニオン クラスと呼ばれ、Java のクラスと同じです。一方、Object オブジェクトは、いくつかの静的変数または静的メソッドを保存するために使用されるシングルトン オブジェクトです。

ここでは主に Class クラスコードを見ていきます。

 class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter], threadNamePrefix: Option[String] = None) extends Logging { // 创建KafkaServer 实例// 构造函数有两个参数—— staticServerConfig 表示静态服务器配置,reporters 表示Kafka 指标报告器。如果threadNamePrefix 参数未用于构造函数,则默认值为None。threadNamePrefix 参数表示线程名称前缀,用于调试和维护目的。 private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters, threadNamePrefix = threadNamePrefix) def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty) // 启动KafkaServer // startup 方法尝试启动Kafka 服务器。如果启动Kafka 服务器时发生异常,则记录一条fatal 错误日志并退出程序。对于成功启动的Kafka 服务器,它将开始监听客户端连接,并在收到消息时执行所需的操作。 def startup(): Unit = { try server.startup() catch { // 如果出现异常,则记录日志并退出程序case _: Throwable => // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code fatal("Exiting Kafka.") Exit.exit(1) } } // 关闭KafkaServer // shutdown 方法尝试停止Kafka 服务器。如果在停止服务器时出现异常,则记录一条fatal 错误日志并强制退出程序。调用shutdown 方法后,服务器将不再接受新的请求,并开始等待当前进行中的请求完成。当所有处理中的请求都完成后,服务器将彻底停止。 def shutdown(): Unit = { try server.shutdown() catch { // 如果出现异常,则记录日志并强制退出程序case _: Throwable => fatal("Halting Kafka.") // Calling exit() can lead to deadlock as exit() can be called multiple times. Force exit. Exit.halt(1) } } // setServerState 方法允许从KafkaServerStartable 对象中设置broker 状态。如果自定义KafkaServerStartable 对象想要引入新的状态,则此方法很有用。 def setServerState(newState: Byte): Unit = { server.brokerState.newState(newState) } // 等待KafkaServer 退出// awaitShutdown 方法等待Kafka 服务器完全退出。在Kafka 服务器执行shutdown 方法后,它将不再接受新的请求。但是,服务器可能仍在处理一些已经接收的请求。awaitShutdown 方法将阻塞当前线程,直到服务器彻底停止。 def awaitShutdown(): Unit = server.awaitShutdown() }

KafkaServerStartable クラスは、起動および停止できる Kafka サーバーです。クラス内の server メンバー変数は KafkaServer クラスのインスタンスであり、KafkaServerStartable クラス オブジェクトが起動されたときに作成されます。このクラスは、Kafka サーバーを起動および停止するためのメソッド、ブローカーのステータスを設定して Kafka サーバーの終了を待機するためのメソッドを提供します。

この記事に関連するのは、起動するために KafkaServer#startup メソッドを呼び出す「startup」メソッドです。

5. KafkaServer クラス

Kafka クラスターは複数のブローカー ノードで構成され、各ブローカー ノードは Kafka インスタンスを実行します。これらのインスタンスは ZK に基づいて相互に検出され、クラスター コントローラー KafkaController によって調整されます。それらはソケット接続に基づいて相互に通信します。

「KafkaServer.scala」クラスのソースコードは、Kafka ソースコード パッケージのコア パッケージにあります。具体的な github ソースコードの場所は次のとおりです。

https://github.com/apache/kafka/blob/2.7.0/core/src/main/scala/kafka/server/KafkaServer.scala。

KafkaServer は Kafka のスタートアップ クラスであり、KafkaController、groupCoordinator、replicaManager など、Kafka のすべてのコンポーネントが含まれています。

 class KafkaServer(val config: KafkaConfig, //配置信息time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, kafkaMetricsReporters: Seq[KafkaMetricsReporter] = List() //监控上报) extends Logging with KafkaMetricsGroup { //标识节点已经启动完成private val startupComplete = new AtomicBoolean(false) //标识节点正在执行关闭操作private val isShuttingDown = new AtomicBoolean(false) //标识节点正在执行启动操作private val isStartingUp = new AtomicBoolean(false) //阻塞主线程等待KafkaServer 的关闭private var shutdownLatch = new CountDownLatch(1) //日志上下文private var logContext: LogContext = null var metrics: Metrics = null //记录节点的当前状态val brokerState: BrokerState = new BrokerState //API接口类,用于处理数据类请求var dataPlaneRequestProcessor: KafkaApis = null //API接口,用于处理控制类请求var controlPlaneRequestProcessor: KafkaApis = null //权限管理var authorizer: Option[Authorizer] = None //启动socket,监听9092端口,等待接收客户端请求var socketServer: SocketServer = null //数据类请求处理线程池var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null //命令类处理线程池var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null //日志管理器var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null //副本管理器var replicaManager: ReplicaManager = null //topic增删管理器var adminManager: AdminManager = null //token管理器var tokenManager: DelegationTokenManager = null //动态配置管理器var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null var credentialProvider: CredentialProvider = null var tokenCache: DelegationTokenCache = null //分组协调器var groupCoordinator: GroupCoordinator = null //事务协调器var transactionCoordinator: TransactionCoordinator = null //集群控制器var kafkaController: KafkaController = null //定时任务调度器var kafkaScheduler: KafkaScheduler = null //集群分区状态信息缓存var metadataCache: MetadataCache = null //配额管理器var quotaManagers: QuotaFactory.QuotaManagers = null //zk客户端配置val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig()) private var _zkClient: KafkaZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" val brokerMetadataCheckpoints = config.logDirs.map(logDir => (logDir, new BrokerMetadataCheckpoint(new File(logDir + File.separator + brokerMetaPropsFile)))).toMap private var _clusterId: String = null private var _brokerTopicStats: BrokerTopicStats = null def clusterId: String = _clusterId // Visible for testing private[kafka] def zkClient = _zkClient private[kafka] def brokerTopicStats = _brokerTopicStats .... }

1. スタートアップ

このタイプの方法はたくさんあります。ここでは、この記事の冒頭で提起した問題を解決するために、起動メソッドのみを調べて、その中でどのコンポーネントが起動されるかを確認します。

 /** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup(): Unit = { try { info("starting") // 是否已关闭if (isShuttingDown.get) throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!") // 是否已启动if (startupComplete.get) return // 是否可以启动val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { // 设置broker状态为Starting brokerState.newState(Starting) /* setup zookeeper */ // 连接ZK,并创建根节点initZkClient(time) /* initialize features */ _featureChangeListener = new FinalizedFeatureChangeListener(featureCache, _zkClient) if (config.isFeatureVersioningSupported) { _featureChangeListener.initOrThrow(config.zkConnectionTimeoutMs) } /* Get or create cluster_id */ // 从ZK获取或创建集群id,规则:UUID的mostSigBits、leastSigBits组合转base64 _clusterId = getOrGenerateClusterId(zkClient) info(s"Cluster ID = $clusterId") /* load metadata */ // 获取brokerId及log存储路径,brokerId通过zk生成或者server.properties配置broker.id // 规则:/brokers/seqid的version值+ maxReservedBrokerId(默认1000),保证唯一性val (preloadedBrokerMetadataCheckpoint, initialOfflineDirs) = getBrokerMetadataAndOfflineDirs /* check cluster id */ if (preloadedBrokerMetadataCheckpoint.clusterId.isDefined && preloadedBrokerMetadataCheckpoint.clusterId.get != clusterId) throw new InconsistentClusterIdException( s"The Cluster ID ${clusterId} doesn't match stored clusterId ${preloadedBrokerMetadataCheckpoint.clusterId} in meta.properties. " + s"The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong.") /* generate brokerId */ config.brokerId = getOrGenerateBrokerId(preloadedBrokerMetadataCheckpoint) logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ") // 配置logger this.logIdent = logContext.logPrefix // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be // applied after DynamicConfigManager starts. // 初始化AdminZkClient,支持动态修改配置config.dynamicConfig.initialize(zkClient) /* start scheduler */ // 初始化定时任务调度器kafkaScheduler = new KafkaScheduler(config.backgroundThreads) kafkaScheduler.startup() /* create and configure metrics */ // 创建及配置监控,默认使用JMX及Yammer Metrics kafkaYammerMetrics = KafkaYammerMetrics.INSTANCE kafkaYammerMetrics.configure(config.originals) val jmxReporter = new JmxReporter() jmxReporter.configure(config.originals) val reporters = new util.ArrayList[MetricsReporter] reporters.add(jmxReporter) val metricConfig = KafkaServer.metricConfig(config) val metricsContext = createKafkaMetricsContext() metrics = new Metrics(metricConfig, reporters, time, true, metricsContext) /* register broker metrics */ _brokerTopicStats = new BrokerTopicStats // 初始化配额管理器quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse("")) notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala) // 用于保证kafka-log数据目录的存在logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) /* start log manager */ // 启动日志管理器,kafka的消息以日志形式存储logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel) // 启动日志清理、刷新、校验、恢复等的定时线程logManager.startup() metadataCache = new MetadataCache(config.brokerId) // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update. // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically. // SCRAM认证方式的token缓存tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) // Create and start the socket server acceptor threads so that the bound port is known. // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. // 启动socket,监听9092端口,等待接收客户端请求socketServer = new SocketServer(config, metrics, time, credentialProvider) socketServer.startup(startProcessingRequests = false) /* start replica manager */ brokerToControllerChannelManager = new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix) // 启动副本管理器,高可用相关replicaManager = createReplicaManager(isShuttingDown) replicaManager.startup() brokerToControllerChannelManager.start() // 将broker信息注册到ZK上val brokerInfo = createBrokerInfo val brokerEpoch = zkClient.registerBroker(brokerInfo) // Now that the broker is successfully registered, checkpoint its metadata // 校验broker 信息checkpointBrokerMetadata(BrokerMetadata(config.brokerId, Some(clusterId))) /* start token manager */ // 启动token 管理器tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient) tokenManager.startup() /* start kafka controller */ // 启动Kafka控制器,只有Leader 会与ZK建连kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix) kafkaController.startup() // admin管理器adminManager = new AdminManager(config, metrics, metadataCache, zkClient) /* start group coordinator */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue // 启动集群群组协调器groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics) groupCoordinator.startup() /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */ // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue // 启动事务协调器transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM) transactionCoordinator.startup() /* Get the authorizer and initialize it if one is specified.*/ // ACL authorizer = config.authorizer authorizer.foreach(_.configure(config.originals)) val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match { case Some(authZ) => authZ.start(brokerInfo.broker.toServerInfo(clusterId, config)).asScala.map { case (ep, cs) => ep -> cs.toCompletableFuture } case None => brokerInfo.broker.endPoints.map { ep => ep.toJava -> CompletableFuture.completedFuture[Void](null) }.toMap } // 创建拉取管理器val fetchManager = new FetchManager(Time.SYSTEM, new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots, KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ // 初始化数据类请求的KafkaApis,负责数据类请求逻辑处理dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) // 初始化数据类请求处理的线程池dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => // 初始化控制类请求的KafkaApis controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache) // 初始化控制类请求的线程池controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, 1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix) } Mx4jLoader.maybeLoad() /* Add all reconfigurables for config change notification before starting config handlers */ config.dynamicConfig.addReconfigurables(this) /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers, kafkaController), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider), ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers)) // Create the config manager. start listening to notifications // 启动动态配置处理器dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() // 启动请求处理线程socketServer.startProcessingRequests(authorizerFutures) // 更新broker状态brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) isStartingUp.set(false) AppInfoParser.registerAppInfo(metricsPrefix, config.brokerId.toString, metrics, time.milliseconds()) info("started") } } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) isStartingUp.set(false) shutdown() throw e } }

このメソッドによって起動されるコンポーネントの概要は次のとおりです。

  • initZkClient(time) は Zk を初期化します。
  • kafkaScheduler タイマー。
  • logManager ログモジュール。
  • MetadataCache メタデータ キャッシュ。
  • socketServer ネットワーク サーバー。
  • replicaManager レプリカ モジュール。
  • kafkaController コントローラー。
  • groupCoordinatorコーディネーターはConsumerCoordinatorと対話するために使用されます
  • transactionCoordinator トランザクション関連
  • fetchManager レプリカ フェッチ マネージャー。
  • dynamicConfigManager 動的構成マネージャー。

2. ブローカーステータス

これはバージョン 2.7.x より前の状態であり、バージョン 2.8.x 以降でリファクタリングされました。

 sealed trait BrokerStates { def state: Byte } case object NotRunning extends BrokerStates { val state: Byte = 0 } case object Starting extends BrokerStates { val state: Byte = 1 } case object RecoveringFromUncleanShutdown extends BrokerStates { val state: Byte = 2 } case object RunningAsBroker extends BrokerStates { val state: Byte = 3 } case object PendingControlledShutdown extends BrokerStates { val state: Byte = 6 } case object BrokerShuttingDown extends BrokerStates { val state: Byte = 7 }
  • NotRunning : 現在のブローカー ノードが実行されていないことを示す初期状態。
  • 起動中: 現在のブローカー ノードが起動中であることを示します。
  • RecoveringFromUncleanShutdown : 現在のブローカー ノードが最後の異常シャットダウンから回復中であることを示します。
  • RunningAsBroker : 現在のブローカー ノードが正常に起動し、外部サービスを提供できることを示します。
  • PendingControlledShutdown : 現在のブローカー ノードが制御されたシャットダウン操作の完了を待機していることを示します。
  • BrokerShuttingDown : 現在のブローカー ノードがシャットダウン操作を実行していることを示します。

これらは、KafkaServer のメイン モジュールへの入り口です。以下の記事では、それらを一つずつ分析していきます。

VI.結論

ここで、この記事の要点をまとめてみましょう。

  • 記事の冒頭では、「kafka-server-start.sh」の内容を解析することで、「kafka.Kafka」クラスを紹介しました。
  • 「kafka.Kafka」のメインメソッドでは、「KafkaServerStartable」が呼び出され、Kafka サーバーの起動が試行されます。
  • 次に、「kafkaserverstartable」のスタートアップ方法では、「kafkaserver」のスタートアップ方法が呼び出され、サーバーが必要とするさまざまなコンポーネントクラスを開始します。

次の記事では、「クラスターを開始するときにブローカーがどのように知覚するか」を詳細に分析します。どうぞお楽しみに。また次回お会いしましょう。

<<:  Dockerの始め方からプロジェクトのデプロイまでお話しましょう

>>:  エッジコンピューティング: 産業の最前線で働く人々にとって強力な手段

推薦する

高帯域幅、低遅延、高可用性を備えたJigoo Technologyは、オーディオおよびビデオ分野で高品質のネットワークを構築します。

4Kや8Kなどの超高精細ビデオ規格の急速な普及に伴い、オーディオとビデオのデータ量が急増し、ストレー...

Mob Lin Rongbo: データファクトリーアーキテクチャのアップグレードについて再考

[51CTO.comより引用] 2018年5月18日〜19日、51CTO主催のグローバルソフトウェア...

チャンネル運営戦略、トラフィックを集めるのは難しくない

多くの人が、なぜチャネル運営に多額の費用をかけているのに、結局効果がないのかと疑問に思うでしょう。広...

テンセントトラベルサービス、誰もが安全に旅行できるよう、流行中の旅行ポリシーのリアルタイムクエリを開始

最近、国内の多くの地域で流行状況が再発しています。春節の旅行ラッシュが近づいており、流行中の旅行政策...

BandwagonHost CN2vpsはどうですか? BandwagonHost CN2の簡単なレビュー

私は BandwagonHost から CN2 回線付きの VPS を入手しました。これは月額払いで...

ウェブサイトの運用に影響を与える決定的な要因は何ですか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています作戦に関し...

検索エンジンの観点からウェブサイトの最適化手法を分析

月給5,000~50,000のこれらのプロジェクトはあなたの将来です本日、小小科堂 SEO 独習ネッ...

alwyzon: 月額 3.32 ユーロ、オランダ VPS、4G メモリ/2 コア/40g SSD/5T トラフィック、カスタムアップロード ISO

alwyzon(Hohl IT eUのブランド)は、オランダ(データセンターはオランダ東部の都市アペ...

ウェブマスターが検索エンジンの変更からガイダンスのアイデアを得る方法

今日、検索エンジンに新しいウェブサイトを申請しました。このウェブサイトの誕生は、私が慎重に検討した結...

Google 特許におけるドメイン名の重みから見るドメイン名の SEO への影響

2003 年 12 月 31 日、Google は、図に示すように、「履歴データに基づく情報検索」と...

オンラインソフト記事の執筆と配信スキルの詳細な説明

現在、多くの企業がオンライン マーケティングに注目し始めています。どのようなマーケティング手法を使用...

App.net が 50 万ドルを調達: 広告なしの純粋なソーシャル ネットワーキング

App.net が 50 万ドルを調達: 広告なしの純粋なソーシャル ネットワーキング新しいソーシャ...

ウェブサイトのコンテンツが重複するいくつかの状況と解決策

重複したウェブサイトコンテンツは、Baidu K-station の主な原因の 1 つですが、重複し...

Baidu が共有する水の深さはどれくらいですか?

2012年の初めに、Baidu は Baidu Share 機能を開始しました。 Baidu のこの...

電子商取引代理店業界は資本化の「黄金時代」を先導したのでしょうか?

昨年のダブル11では、テンセントテクノロジーと共同で電子商取引代理業務に関するシリーズの最初の解釈を...