[[420379]] 先ほど説明したメッセージ送信シーケンス図を確認してみましょう。前のセクションでは、Kafka に関連するメタデータ情報とメッセージのカプセル化について説明しました。メッセージのカプセル化が完了すると、メッセージが送信されます。このタスクは Sender スレッドによって実装されます。 1. 送信スレッドKafkaProducer オブジェクトを見つけます。 KafkaProducer のコンストラクターには数行のコードがあります。 - this.accumulator = 新規
- レコードアキュムレータ(config.getInt(プロデューサーConfig.BATCH_SIZE_CONFIG)、
- this.totalMemorySize、
- this.compressionType、
- config.getLong(プロデューサーConfig.LINGER_MS_CONFIG)、
- 再試行バックオフMs、
- 指標、
- 時間);
RecordAccumulator オブジェクトが構築され、オブジェクト内の各メッセージ バッチのサイズ、バッファー サイズ、圧縮形式などが設定されます。 次に、メッセージを送信するためのキャリアとして使用される非常に重要なコンポーネント NetworkClient が構築されます。 - NetworkClientクライアント = 新しいNetworkClient(
- 新しいセレクター(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)、this.metrics、 time 、 "producer" 、channelBuilder)、
- this.メタデータ、
- クライアントID、
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)、
- config.getLong(プロデューサーConfig.RECONNECT_BACKOFF_MS_CONFIG)、
- config.getInt(プロデューサー構成.SEND_BUFFER_CONFIG)、
- config.getInt(プロデューサー構成.RECEIVE_BUFFER_CONFIG)、
- this.requestTimeoutMs、時間);
構築された NetworkClient には、注意すべき重要なパラメータがいくつかあります。 √ connections.max.idle.ms: ネットワーク接続がアイドル状態を維持できる最長時間を示します。アイドル時間がこの値を超えると、ネットワーク接続が閉じられます。デフォルト値は 9 分です。 √ max.in.flight.requests.per.connection: 各ネットワーク接続がプロデューサーからブローカーへの応答なしで許容できるメッセージの数を示します。デフォルト値は 5 です。(ps: プロデューサーがブローカーにデータを送信する場合、実際には複数のネットワーク接続が存在します) √ send.buffer.bytes: ソケットがデータを送信するために使用するバッファのサイズ。デフォルト値は 128K です。 √receive.buffer.bytes: ソケットがデータを受信するために使用するバッファのサイズ。デフォルト値は 32K です。 送信スレッドがメッセージを送信し始めるまで、良いニュースを送信するためのネットワーク チャネルを構築します。 - this.sender = 新しいSender(クライアント、
- this.メタデータ、
- this.accumulator、
- config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
- config.getInt(プロデューサー構成.MAX_REQUEST_SIZE_CONFIG)、
- (ショート) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG))、
- config.getInt(プロデューサー構成.RETRIES_CONFIG)、
- this.metrics、
- 新しいSystemTime()、
- クライアントID、
- リクエストタイムアウトMs);
- //デフォルトのスレッド名プレフィックスはkafka-producer-network-threadです。clientIdはプロデューサーのIDです。
- 文字列 ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "" );
- //デーモン スレッドを作成し、Sender オブジェクトをそのスレッドに渡します。
- this.ioThread = 新しい KafkaThread(ioThreadName、this.sender、 true );
- //スレッドを開始する
- スレッドを開始します。
ここは非常に明確です。スレッドなので、run() メソッドが必要です。この方法では実装ロジックに焦点を当てます。ここで、学習する価値のあるスレッドの使用パターンを追加する必要があります。送信スレッドが作成された後、送信スレッドがすぐに開始されず、新しい KafkaThread スレッドが作成され、送信オブジェクトがそれに渡されてから KafkaThread スレッドが開始されることがわかります。多くの友人が疑問を抱くだろうと思います。 KafkaThread クラスにアクセスして、その内容を見てみましょう。 - /**
- *物事をうまくセットアップするThreadのラッパー
- */
- パブリッククラスKafkaThreadはThreadを拡張します{
- プライベート最終ロガー ログ = LoggerFactory.getLogger(getClass());
- public KafkaThread(final String name 、Runnable runnable、boolean daemon) {
- super(実行可能オブジェクト、名前);
- //バックグラウンドデーモンスレッドとして設定
- setDaemon(デーモン);
- UncaughtExceptionHandler を設定します(新しい Thread.UncaughtExceptionHandler() {
- パブリックvoid uncaughtException(スレッド t、Throwable e) {
- log.error( "" + name + " でキャッチされない例外: " , e);
- }
- });
- }
- }
KafkaThread スレッドは実際にはデーモン スレッドを開始するだけであることがわかっていますが、これを行う利点は何でしょうか?答えは、ビジネス コードとスレッド自体を分離し、複雑なビジネス ロジックを KafkaThread などのスレッドに実装できるということです。このように、Sender スレッドはコード レベルで非常に簡潔かつ読みやすくなります。 まず、Sender オブジェクトの構造を見てみましょう。 /** * 主な機能は、バックグラウンドスレッドを使用してKafkaクラスタへのプロダクションリクエストを処理し、メタデータ情報を更新し、適切なノードにメッセージを送信することです。 * Kafka クラスターへの生成リクエストの送信を処理するバックグラウンド スレッド。このスレッドはメタデータを作成します * クラスターのビューを更新するように要求し、適切なノードに生成要求を送信します。 - パブリッククラスSenderはRunnableを実装します{
- プライベート静的最終ロガー ログ = LoggerFactory.getLogger(Sender.class);
- //Kafka ネットワーク通信クライアント。主にブローカーとのネットワーク通信に使用されます。
- プライベート最終 KafkaClient クライアント;
- //メッセージアキュムレータ、バッチメッセージレコードを含む
- プライベート最終 RecordAccumulator アキュムレータ;
- //クライアントのメタデータ情報
- プライベート最終メタデータメタデータ;
- /* プロデューサーがメッセージの順序を保証するかどうかを示すフラグ ブローカーまたは ない。 */
- //メッセージの順序を保証するためにマークする
- プライベート最終ブール値 guaranteeMessageOrder;
- /* 最大リクエストサイズ サーバーに送信を試みる* /
- //対応する設定はmax.request.sizeで、これはsend() メソッドを呼び出して送信される最大リクエスト サイズを表します。
- プライベート最終int maxRequestSize;
- /*サーバーに要求する確認応答の数 */
- //メッセージの送信ステータスを確認するために使用します。3 つのオプションがあります: -1、0、1
- プライベート最終ショート ACK。
- /* 失敗したリクエストを諦める前に再試行する回数* /
- //リクエスト失敗後の再試行回数
- プライベート最終int再試行;
- /*時間を取得するために使用されるクロックインスタンス*/
- //時間ツール、時間を計算、特別な意味はありません
- プライベート最終時間 時間;
- /*送信スレッドがまだ実行中の場合はtrue */
- //スレッドの状態を示します。trueは実行中を意味します
- プライベート揮発性ブール値の実行;
- /*真実 発信者が望むとき 無視する 未送信/送信中のすべてのメッセージと 力 近い。 */
- //メッセージ送信のフラグを強制的に閉じます。 trueに設定すると、メッセージは正常に送信されたかどうかに関係なく無視されます。
- プライベート volatile ブール値 forceClose;
- /* メトリック */
- //インジケーターコレクションを送信
- プライベート最終SenderMetricsセンサー;
- /*クライアントのパラメータ clientId */
- //プロデューサークライアントID
- プライベート文字列クライアントID;
- /*最大値 時間 サーバーがリクエストに応答するのを待つ* /
- //リクエストタイムアウト
- プライベート最終intリクエストタイムアウト;
- //コンストラクタ
- パブリックSender(KafkaClientクライアント、
- メタデータ メタデータ、
- RecordAccumulator アキュムレータ、
- ブール型 guaranteeMessageOrder、
- 整数最大リクエストサイズ、
- ショートACK、
- int再試行、
- メトリクスメトリクス、
- 時間 時間、
- 文字列クライアントID、
- intリクエストタイムアウト) {
- this.client = クライアント;
- this.accumulator = アキュムレータ;
- this.metadata = メタデータ;
- this.guaranteeMessageOrder = guaranteeMessageOrder;
- this.maxRequestSize = maxRequestSize;
- this.running = true ;
- this.acks = acks;
- this.retries = 再試行;
- this.time =時間;
- this.clientId = クライアントID;
- this.sensors = 新しい SenderMetrics(metrics);
- this.requestTimeout = リクエストタイムアウト;
- }
- ....
- }
Sender オブジェクトの初期化パラメータの概要を理解したら、実際に Sender オブジェクトの run() メソッドを見つけてみましょう。 - パブリックボイド実行(){
- log.debug( "Kafka プロデューサー I/O スレッドを開始しています。" );
- //送信スレッドが開始されると、連続実行状態になります
- (実行中){
- 試す {
- //コアコード
- 実行(時間.ミリ秒());
- } キャッチ (例外 e) {
- log.error( "Kafka プロデューサー I/O スレッドでキャッチされないエラーが発生しました: " , e);
- }
- }
- log.debug( "Kafka プロデューサー I/O スレッドのシャットダウンを開始し、残りのレコードを送信しています。" );
- // リクエストの受付を停止しましたが、まだ残っている可能性があります
- //アキュムレータ内のリクエストまたは確認応答を待機中、
- // これらが完了するまで待機します。
- while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
- 試す {
- 実行(時間.ミリ秒());
- } キャッチ (例外 e) {
- log.error( "Kafka プロデューサー I/O スレッドでキャッチされないエラーが発生しました: " , e);
- }
- }
- if (強制終了) {
- //未完了のバッチをすべて失敗させ、待機中のスレッドを起動する必要があります。
- // 将来。
- this.accumulator.abortIncompleteBatches();
- }
- 試す {
- this.client.close () ;
- } キャッチ (例外 e) {
- log.error( "ネットワーククライアントを閉じることができませんでした" , e);
- }
- log.debug( "Kafka プロデューサー I/O スレッドのシャットダウンが完了しました。" );
- }
上記の run() メソッドには、2 つの while 判断があり、どちらもスレッドを中断せずに実行し続け、ブローカーにメッセージを送信することを目的としています。どちらの場所でも、時間パラメータを持つ別の run(xx) オーバーロード メソッドを呼び出します。最初の run(ts) メソッドは、メッセージ バッファー内のメッセージをブローカーに送信します。 2 番目の run(ts) メソッドは、まずスレッドを強制的に閉じるかどうかを判断します。強制的に閉じられていない場合は、スレッドを終了する前にメッセージ バッファー内の未送信メッセージが送信されます。 /** * 送信を1回繰り返して実行する * * @param 現在 * 現在のPOSIX時間(ミリ秒) */ - void run(long now) {
-
- //最初のステップはメタデータを取得することです
- クラスター クラスター = メタデータ。フェッチ();
- //送信準備が整ったデータを含むパーティションのリストを取得します
-
- //2番目のステップは、どのパーティションが送信条件を満たすかを判断することです
- RecordAccumulator.ReadyCheckResult 結果 = this.accumulator.ready(cluster, now);
- /**
- * ステップ3: メタデータがまだ取得されていないトピックを特定する
- */
- 結果が空の場合
- //セット リーダーが不明なトピックには、リーダー選出が保留中のトピックと
- // 期限が切れている可能性のあるトピック。トピックをメタデータに再度追加して、確実に含まれるようにします。
- //トピックに送信するメッセージがあるため、メタデータの更新を要求します。
- (文字列トピック: result.unknownLeaderTopics)の場合
- this.metadata.add (トピック);
- this.metadata.requestUpdate();
- }
- //送信する準備ができていないノードを削除します
- イテレータ<Node> iter = result.readyNodes.iterator();
- long notReadyTimeout = Long.MAX_VALUE;
- (iter.hasNext()) の間 {
- ノード node = iter.next ();
- /**
- * ステップ 4: データを送信するホストとのネットワークが確立されているかどうかを確認します。
- */
- //戻り値がfalseの場合
- if (!this.client.ready(node, now)) {
- // 結果からメッセージを送信するホストを削除します。
- //ここですべてのホストが削除されることがわかります
- iter.remove();
- notReadyTimeout = 数学。 min (notReadyTimeout、this.client.connectionDelay(node、now));
- }
- }
/** * 5 番目のステップでは、送信するパーティションが多数ある可能性があります。この場合、このような状況になる可能性がある * 一部のパーティションのリーダー パーティションは同じサーバー上に分散されます。 * * */ - Map< Integer , List<RecordBatch>> batches = this.accumulator.drain(cluster,
- 結果.readyNodes、
- this.maxRequestSize、
- 今);
- if (メッセージ順序を保証する) {
- // 排出されたすべてのパーティションをミュートする
- //バッチが空の場合は実行をスキップします。
- for (List<RecordBatch> batchList : batches.values ( )) {
- ( RecordBatch バッチ: batchList)
- this.accumulator.mutePartition(batch.topicPartition);
- }
- }
/** * ステップ6: 時間指定バッチ処理 * */ - リスト<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
- //センサーを更新
- ( RecordBatch期限切れバッチ: 期限切れバッチ)
- this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
- sensors.updateProduceRequestMetrics(バッチ);
- /**
* ステップ 7: メッセージを送信するリクエストを作成し、バッチで送信してネットワーク伝送コストを削減し、スループットを向上させます。 */ - <ClientRequest> リクエストをリストします。リクエストは、バッチファイルから作成されます。
- // 送信準備が整ったノードがあり、送信可能なデータがある場合は、 0タイムアウトでポーリングして、すぐに送信できるようにします。
- // ループしてさらにデータを送信してみます。それ以外の場合、タイムアウトはデータのあるパーティションを持つノードによって決定されます。
- // まだ送信可能ではありません (例: 保留中、バックオフ中)。これにはノードは含まれないことに注意してください
- //送信可能なデータはビジー ループの原因となるため、送信する準備ができていません。
- long pollTimeout = Math. min (result.nextReadyCheckDelayMs、notReadyTimeout);
- (result.readyNodes.size () > 0)の場合{
- log.trace( "送信準備のできたデータを持つノード: {}" , result.readyNodes);
- log.trace( "{} が作成され、リクエストが生成されました: {}" , requests.size (), requests);
- ポーリングタイムアウト = 0;
- }
- //リクエスト送信操作
- (ClientRequestリクエスト: リクエスト)
- // op_write をバインドする
- client.send(リクエスト、今);
-
- //すでに送信準備が整っているパーティションがある場合、選択 時間は0になります。
- // それ以外の場合、一部のパーティションにすでにデータが蓄積されているがまだ準備ができていない場合、
- //選択 時刻は現在とリンガー有効期限の間の時間差になります。
- // それ以外の場合は選択 時刻は現在とメタデータの有効期限の間の時間差になります。
- /**
* ステップ8: NetWordClientコンポーネントは、実際にネットワーク操作を実行するコンポーネントです。 * 含まれるもの: リクエストの送信、応答の受信(応答の処理) - this.client.poll(pollTimeout, 現在);
上記の run(long) メソッドの実行プロセスは、次の手順にまとめられます。 1. クラスターのメタデータ情報を取得する 2. RecordAccumulator の ready() メソッドを呼び出して、現在のタイムスタンプで送信できるパーティションを判別し、パーティション リーダー パーティションのメタデータ情報を取得して、メッセージを受信できるノードを見つけます。 3. メタデータにまだプルされていないトピックをマークします。キャッシュ内にunknownLeaderTopicsとしてマークされたトピック情報がある場合は、これらのトピックをメタデータに追加し、メタデータのrequestUpdate()メソッドを呼び出してメタデータの更新を要求します。 4.ステップで返された結果からメッセージを受信する必要のないノードを削除し、メッセージを受信する準備ができているノードのみを走査し、送信するノードとのネットワークが確立されているかどうかを確認し、送信条件を満たさないノードをreadyNodeから削除します。 5. ネットワーク接続が確立されたノード セットに対して、RecordAccumulator のdrain() メソッドを呼び出して、送信を待機しているメッセージ バッチ セットを取得します。 6. タイムアウトしたメッセージを処理し、RecordAccumulator の addExpiredBatches() メソッドを呼び出し、RecordBatch をループして、その中のメッセージがタイムアウトしたかどうかを判断します。そうであれば、キューから削除してリソーススペースを解放します。 7. メッセージを送信するリクエストを作成し、createProducerRequest メソッドを呼び出して、メッセージ バッチを ClientRequest オブジェクトにカプセル化します。バッチは通常複数であるため、リストが返されます。集める 8. NetworkClientのsend()メソッドを呼び出し、KafkaChannelのop_write操作をバインドします。 9. NetworkClientのpoll()メソッドを呼び出してメタデータ情報を取得し、接続を確立し、ネットワーク要求を実行し、応答を受信し、メッセージの送信を完了します。 上記は、メッセージとクラスター メタデータの Sender スレッドのコア プロセスです。これには、別のコア コンポーネントである NetworkClient が関係します。 2. ネットワーククライアントNetworkClient はメッセージを送信するための媒体です。メッセージを送信するプロデューサーであっても、メッセージを受信するコンシューマーであっても、ネットワーク接続を確立するには NetworkClient に依存する必要があります。同様に、まず NetworkClient のコンポーネントを理解します。これには主に NIO に関する知識が必要です。興味のある方は、NIO の原理と構成をご覧ください。 - /**
- *非同期要求/応答ネットワーク I/O用のネットワーク クライアント。これは、
- *ユーザー向けのプロデューサー クライアントとコンシューマー クライアント。
- * <p>
- * このクラスは スレッドセーフではありません!
- */
- パブリッククラスNetworkClientはKafkaClientを実装します
- {
-
- プライベート静的最終ロガー ログ = LoggerFactory.getLogger(NetworkClient.class);
- /* ネットワークI/Oを実行するために使用されるセレクタ*/
- //java NIOセレクター
- private final 選択可能なセレクター。
- プライベート最終メタデータ更新者メタデータ更新者;
- プライベート最終ランダム randOffset;
- /*各ノードの接続状態*/
- プライベート最終 ClusterConnectionStates 接続状態;
- /*セット 現在送信中または応答を待っているリクエストの数*/
- プライベート最終 InFlightRequests inFlightRequests;
- /* ソケット送信バッファサイズ バイト単位*/
- プライベート最終int socketSendBuffer;
- /* ソケット受信バッファサイズ(バイト単位) */
- プライベート最終int socketReceiveBuffer;
- /*サーバーへのリクエストでこのクライアントを識別するために使用されるクライアント ID */
- プライベート最終文字列クライアントID;
- /* サーバーにリクエストを送信するときに使用する現在の相関 ID */
- プライベートint相関;
- /*最大 時間 プロデューサーがサーバーからの確認応答を待つ時間(ミリ秒) */
- プライベート最終int requestTimeoutMs;
- プライベート最終時間 時間;
- ......
- }
NetworkClient は、Selectable、MetadataUpdater、ClusterConnectionStates、InFlightRequests などのいくつかのコア クラスを含む KafkaClient インターフェイスを実装していることがわかります。 2.1 選択可能その中でも、Selectable は非同期のノンブロッキング ネットワーク IO を実装するインターフェースです。クラス注釈から、Selectable は読み取り、書き込み、接続、その他の操作を含む複数のネットワーク接続を単一のスレッドで管理できることがわかります。これは NIO と一致しています。 まず、org.apache.kafka.common.network パッケージにある Selectable 実装クラス Selector を見てみましょう。ソースコードはたくさんあるので、比較的重要なものをピックアップしてみましょう。 - パブリッククラスSelectorはSelectableを実装します{
- 公共 静的最終long NO_IDLE_TIMEOUT_MS = -1;
- プライベート静的最終ロガー log = LoggerFactory.getLogger(Selector.class);
- //このオブジェクトはjavaNIOのセレクタです
- //セレクターは、ネットワークの確立、ネットワーク要求の送信、実際のネットワーク IO の処理を担当します。
- //最もコアとなるコンポーネントと言えます。
- プライベート最終 java.nio.channels.Selector nioSelector;
- //Broker と KafkaChannel (SocketChannel) のマッピング
- //ここでのkafkaChannelは、一時的にSocketChannelとして理解できます
- //NodeIdとKafkaChannel間のマッピング関係を維持する
- プライベート最終 Map<String, KafkaChannel> チャネル;
- //送信されたリクエストを記録する
- プライベート最終リスト<Send> 送信完了;
- //受信され処理された応答を記録します。
- プライベート最終リスト<NetworkReceive>completedReceives;
- // 応答は受信されましたが、まだ処理されていません。
- // 1 つの接続は 1 つの応答キューに対応します
- プライベート最終 Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
- private final <SelectionKey> を直ちに接続キーに設定します。
- //ホストへの接続またはポート接続が確立されていません
- プライベート最終List<String>切断されました。
- //ホスト接続を完了する
- プライベート最終リスト<String>接続;
- //接続に失敗したホスト。
- プライベート最終リスト<String> failedSends;
- プライベート最終時間 時間;
- プライベート最終SelectorMetricsセンサー;
- プライベート最終文字列metricGrpPrefix;
- プライベート最終 Map<String, String> metricTags;
- //KafkaChannel を作成するためのビルダー
- プライベート最終チャネルビルダーチャネルビルダー;
- プライベート最終int maxReceiveSize;
- プライベート最終ブール値 metricsPerConnection;
- プライベートファイナルIdleExpiryManagerアイドルExpiryManager;
ネットワーク要求を開始する最初のステップは、接続、イベントの登録、メッセージの送信および処理であり、これにはいくつかのコア メソッドが含まれます。 1. connect() メソッドを接続する - /**
- *指定されたアドレスへの接続を開始し、 接続を追加する 指定されたIDに関連付けられたこのnioSelectorに
- * 番号。
- * <p>
- * この呼び出しは接続を開始するだけであり、これは将来の {@link #poll(long)}で完了することに注意してください。
- * 電話。特定のポーリング呼び出し後にどの接続が完了したか(ある場合) を確認するには、{@link #connected()}をチェックします。
- * @param id新しい接続のID
- * @param addressアドレス 接続する に
- * @param sendBufferSize新しい接続の送信バッファ
- * @param receivedBufferSize新しい接続の受信バッファ
- *すでに接続がある場合はIllegalStateExceptionをスローします そのIDの
- *ホスト名のDNS解決に失敗した場合、またはブローカーがダウンしている場合はIOExceptionがスローされます
- */
- @オーバーライド
- パブリックvoid connect (String id, InetSocketAddress address, int sendBufferSize, int receivedBufferSize) は IOException をスローします {
- if (this.channels.containsKey(id))
- throw new IllegalStateException( "id の接続が既に存在します " + id);
- //ソケットチャネルを取得する
- SocketChannel socketChannel = SocketChannel。開ける();
- //非ブロッキングモードに設定
- socketChannel.configureBlocking( false );
- ソケット socket = socketChannel.socket();
- socket.setKeepAlive( true );
- //送信バッファサイズや受信バッファサイズなどのネットワークパラメータを設定する
- (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) の場合
- socket.setSendBufferSize(sendBufferSize);
- (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE) の場合
- socket.setReceiveBufferSize(受信バッファサイズ);
- //デフォルト値はfalseで、Nagle アルゴリズムが有効になることを意味します。
- //ネットワーク内の小さなデータパケットを収集し、それらを大きなデータパケットに結合して送信します
- //ネットワーク内で多数の小さなデータパケットが送信されると、伝送効率に影響を与えると考えられるため
- socket.setTcpNoDelay( true );
- ブール接続;
- 試す {
- //非ブロッキングなのでサーバーに接続してみる
- //接続がすぐに成功する可能性があり、成功した場合はtrueを返します
- //接続に成功するまでに長い時間がかかり、 falseが返されることもあります。
- connected = socketChannel.connect (アドレス);
- } キャッチ (UnresolvedAddressException e) {
- socketChannel.close () ;
- 新しい IOException をスローします ( "アドレスを解決できません: " + address, e);
- } キャッチ (IOException e) {
- socketChannel.close () ;
- eを投げる;
- }
- //SocketChannel はセレクタに OP_CONNECT を登録します
- 選択キーキー= socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
- //SocketChannel に基づいて KafkaChannel をカプセル化します
- KafkaChannel チャネル = channelBuilder.buildChannel(id、キー、最大受信サイズ);
- //キーをKafkaChannelに関連付ける
- //キーに基づいてKafkaChannelを見つけることができます
- //KafkaChannelに基づいてキーを見つけることもできます
- キー.attach(チャネル);
- //キャッシュする
- this.channels.put(id, チャネル);
-
- //接続されている場合
- (接続)の場合{
- // OP_CONNECTはトリガーされません すぐに接続できるチャンネル
- log.debug( "ノード {} にすぐに接続されました" , channel.id());
- すぐにConnectedKeys.add (キー) を実行します。
- // 以前に登録した OP_CONNECT イベントをキャンセルします。
- キー.interestOps(0);
- }
- }
2. レジスタregister() - /**
- *既存のチャネルにnioSelectorを登録する
- *接続が確立していない場合は、サーバー側でこれを使用します 別のスレッドによって受け入れられるが、セレクタによって処理される
- *接続IDが有効かどうかはチェックしていないことに注意してください-接続はすでに存在しているためです
- */
- パブリックvoid register(String id, SocketChannel socketChannel) は ClosedChannelException をスローします {
- //独自のセレクタにOP_READイベントを登録する
- //このようにして、プロセッサ スレッドはクライアントから送信された接続を読み取ることができます。
- 選択キーキー= socketChannel.register(nioSelector, SelectionKey.OP_READ);
- //Kafka は SocketChannel の KakaChannel をカプセル化します
- KafkaChannel チャネル = channelBuilder.buildChannel(id、キー、最大受信サイズ);
- //キーとチャンネル
- キー.attach(チャネル);
- //サーバー上のコードはクライアント上のネットワークコードと再利用されます
- //チャネルは複数のネットワーク接続を維持します。
- this.channels.put(id, チャネル);
- }
3. send() を送信する - /**
- * 指定されたリクエストをキューに入れて、後続の{@link #poll(long)}呼び出しで送信します。
- * @param send送信するリクエスト
- */
- パブリックvoid send(Send send) {
- //KafakChannel を取得する
- KafkaChannel チャネル = channelOrFail(send.destination());
- 試す {
- //重要なメソッド
- チャネルをsetSend(送信);
- } キャッチ (CancelledKeyException e) {
- this.failedSends.add ( send.destination ());
- 閉じる(チャネル)
- }
- }
4. メッセージ処理poll() - @オーバーライド
- パブリックvoid poll(long timeout) は IOException をスローします {
- (タイムアウト < 0)の場合
- 新しい IllegalArgumentException をスローします ( "タイムアウトは >= 0 である必要があります" );
- // 最後のpoll()メソッドによって返された結果をクリアします
- クリア();
- if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
- タイムアウト = 0;
- /* 準備完了キーをチェック*/
- 長いstartSelect = time .nanoseconds();
- //セレクターから登録されているキーの数を調べ、I/Oイベントが発生するのを待ちます
- int readyKeys =選択(タイムアウト);
- long endSelect = time .nanoseconds();
- this.sensors.selectTime.record(endSelect - startSelect, time .milliseconds());
- //キーは確かに上に登録されました
- (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty())の場合{
- //I/Oイベントを処理し、このセレクタのキーを処理します
- 選択キーをポーリングします(this.nioSelector.selectedKeys(), false , endSelect);
- pollSelectionKeys(すぐに接続されたキー、 true 、endSelect);
- }
- // stagedReceives内のデータを処理する必要がある
- 完了した受信を追加します();
- long endIo = time .nanoseconds();
- this.sensors.ioTime.record(endIo - endSelect, time .milliseconds());
- //時間を使用します 最後に の 選択 閉鎖されないようにするために いかなるつながりも
- // pollSelectionKeysで処理されたばかりです
- //処理が完了したら、長いリンクを閉じます
- おそらく最も古い接続を閉じます(endSelect);
- }
5.セレクターでキーを処理する - // OP_CONNECT、OP_READ、OP_WRITE イベントの処理に使用され、接続ステータスの検出も担当します
- プライベート void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
- ブール値 isImmediatelyConnected、
- 長いcurrentTimeNanos) {
- //すべてのキーを取得
- イテレータ<SelectionKey> iterator = selectionKeys.iterator();
- //すべてのキーを反復処理する
- (イテレータ.hasNext()) の間 {
- 選択キーキー= iterator.next ( );
- イテレータを削除します。
- //キーに応じて対応する KafkaChannel を検索します
- KafkaChannel チャネル = チャネル(キー);
- //接続ごとのメトリックをすべて一度に登録する
- センサー.maybeRegisterConnectionMetrics(channel.id());
- (idleExpiryManager != null )の場合
- アイドル有効期限マネージャー。更新(channel.id()、currentTimeNanos);
- 試す {
- /* ハンドシェイクが完了した接続を完了します(通常または即時)*/
- //接続完了とOP_CONNECTイベントを処理する
- if (isImmediatelyConnected ||キー.isConnectable()) {
- //ネットワーク接続を完了します。
- チャネルが接続を完了している場合
- //ネットワーク接続が完了したら、このチャネルを接続セットに追加します
- this.connected.add (channel.id());
- this.sensors.connectionCreated.record();
- SocketChannel socketChannel = (SocketChannel)キー.channel();
- log.debug( "SO_RCVBUF = {}、SO_SNDBUF = {}、SO_TIMEOUT = {} でノード {} にソケットを作成しました" 、
- socketChannel.socket().getReceiveBufferSize()、
- socketChannel.socket().getSendBufferSize()、
- socketChannel.socket().getSoTimeout()、
- チャンネルID();
- }それ以外
- 続く;
- }
- /* チャンネルが 準備ができていません完了準備*/
- //ID認証
- チャネルが接続済みの場合 (チャネルが準備完了の場合)
- チャネルを準備します();
- /* チャネルが読み取り可能な状態の場合 から 読み取り可能なデータを持つ接続 * /
- チャネルが準備完了の場合、キーは読み取り可能になり、ステージング受信が完了します。
- ネットワーク受信ネットワーク受信;
- //OP_READ イベントを処理し、サーバーから返された応答 (リクエスト) を受け入れます
- //networkReceiveはサーバーから返された応答を表します
- ((networkReceive = channel.read ()) != null )の場合
- ステージング受信にチャネルを追加します。
- }
- /* チャネルが準備できたら書き込み 空きのあるソケット バッファと データがあるもの * /
- //OP_WRITEイベントを処理する
- チャネルが準備完了の場合、キーは書き込み可能になります。
- //送信するネットワーク要求を取得し、データをサーバーに送信する
- //メッセージが送信されると、OP_WRITEは削除されます
- 送信 send = channel.write();
- //応答メッセージが送信されました。
- if (送信 != null ) {
- this.completedSends.add (送信) ;
- this.sensors.recordBytesSent(channel.id(), send.size ());
- }
- }
- /* 機能していないソケットをキャンセルします */
- if (!キー.isValid()) {
- 閉じる(チャネル)
- this.disconnected.add (channel.id());
- }
- } キャッチ (例外 e) {
- 文字列desc = channel.socketDescription();
- if (e インスタンス IOException)
- log.debug( "{} との接続が切断されました" , desc , e);
- それ以外
- log.warn( "{} から予期しないエラーが発生しました。接続を閉じています" , desc , e);
- 閉じる(チャネル)
- //失敗した接続のコレクションに追加します
- this.disconnected.add (channel.id());
- }
- }
- }
2.2 メタデータアップデータこれは、NetworkClient がクラスター メタデータ情報の更新を要求し、クラスター ノードを取得するために使用するインターフェイスです。これは、DefaultMetadataUpdater と ManualMetadataUpdater という 2 つの実装クラスを持つ、スレッドセーフではない内部クラスです。 NetworkClient は、NetworkClient のデフォルトの実装クラスであり、NetworkClient の内部クラスでもある DefaultMetadataUpdater クラスを使用します。ソースコードから見ると次のようになります。 - メタデータ更新者 == nullの場合{
- if (メタデータ == null )
- 新しい IllegalArgumentException をスローします ( "`metadata` は null であってはなりません" );
- this.metadataUpdater = 新しい DefaultMetadataUpdater(メタデータ);
- }それ以外{
- this.metadataUpdater = メタデータUpdater;
- }
- ......
- クラス DefaultMetadataUpdater は MetadataUpdater を実装します
- {
- /*現在のクラスターのメタデータ */
- //クラスターメタデータオブジェクト
- プライベート最終メタデータメタデータ;
- /*メタデータリクエストが送信され、 まだ回答を受け取っていないもの* /
- //MetadataRequest が送信されたかどうかを識別するために使用されます。すでに送信されている場合は再度送信する必要はありません
- プライベートブール値 metadataFetchInProgress;
- /*最後の タイムスタンプ いつ ブローカーノードが利用できません 接続する*/
- //利用可能なノードが見つからなかったときのタイムスタンプを記録する
- プライベート長いlastNodeAvailableMs;
- DefaultMetadataUpdater(メタデータメタデータ) {
- this.metadata = メタデータ;
- this.metadataFetchInProgress = false ;
- this.lastNoNodeAvailableMs = 0;
- }
- //クラスタノードセットを返す
- @オーバーライド
- パブリックリスト<Node> fetchNodes() {
- metadata.fetch ().nodes()を返します。
- }
- @オーバーライド
- パブリックブール値isUpdateDue(long now) {
- 戻り値: !this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0;
- }
- //コア メソッド。現在のクラスターに保存されているメタデータを更新する必要があるかどうかを判断します。更新が必要な場合は、MetadataRequestリクエストを送信します。
- @オーバーライド
- パブリックlong maybeUpdate(long now) {
- //メタデータを更新する必要がありますか?
- //次のメタデータ更新のタイムスタンプを取得する
- 長い timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
- //サーバーへの接続を次に再試行するときのタイムスタンプを取得します
- long timeToNextReconnectAttempt = Math.最大値(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - 現在、0);
- //MetadataRequestリクエストが送信されたかどうかを確認します
- 長い waitForMetadataFetch = this.metadataFetchInProgress ?整数.MAX_VALUE: 0;
- //存在する場合 利用可能なノードがありません 接続してメタデータの更新を中止する
- 長いメタデータタイムアウト = Math.最大(数学。最大(次のメタデータ更新までの時間、次の再接続試行までの時間)、
- メタデータ取得を待機します);
- メタデータタイムアウト == 0 の場合
- //このメソッドの動作とpoll()のタイムアウトの計算は
- // leastLoadedNodeの動作に大きく依存します。
- // 負荷が最も少ないノードを見つける
- ノード node = leastLoadedNode(now);
- // MetadataRequest リクエストを作成し、poll() メソッドをトリガーして実際の送信操作を実行します。
- おそらく更新します(現在、ノード);
- }
- メタデータタイムアウトを返します。
- }
- //接続が確立されていないリクエストを処理する
- @オーバーライド
- パブリックブール値 maybeHandleDisconnection(ClientRequest リクエスト) {
- ApiKeys リクエストキー = ApiKeys.forId(request.request().header().apiKey());
- リクエストキー == ApiKeys.METADATA && request.isInitiatedByNetworkClient() の場合 {
- クラスター クラスター = メタデータ。フェッチ();
- (cluster.isBootstrapConfigured())の場合{
- int nodeId = Integer .parseInt(request.request().destination());
- ノード node = cluster.nodeById(nodeId);
- if (ノード != null )
- log.warn( "ブートストラップ ブローカー {}:{} が切断されました" 、 node.host()、 node.port());
- }
- メタデータの取得が進行中 = false ;
- 戻る 真実;
- }
- 戻る 間違い;
- }
- // 応答情報を解析する
- @オーバーライド
- パブリックブール値 maybeHandleCompletedReceive(ClientRequest req, long now, 構造体本体) {
- 短い apiKey = req.request().header().apiKey();
- //MetadataRequest リクエストかどうかを確認します
- (apiKey == ApiKeys.METADATA.id && req.isInitiatedByNetworkClient())の場合{
- // レスポンスを処理する
- handleResponse(req.request().header(), body, now);
- 戻る 真実;
- }
- 戻る 間違い;
- }
- @オーバーライド
- パブリックボイドリクエストアップデート() {
- this.metadata.requestUpdate();
- }
- //MetadataRequest リクエストの応答を処理する
- private void handleResponse(RequestHeader ヘッダー、構造体本体、long now) {
- this.metadataFetchInProgress = false ;
- //サーバーはバイナリデータ構造を返すため
- //したがって、プロデューサーはこのデータ構造をここで解析する必要があります
- //解析後、それはメタデータのアプセル化されています。
- MetadatAresponse応答=新しいメタデータデータアスパンズ(body);
- //応答はメタデータ情報を戻します
- //サーバーから引っ張られたクラスターのメタデータ情報を取得します。
- Cluster Cluster = Response.Cluster();
- //トピックのメタデータが更新に失敗したかどうかを確認してください
- map <string、errors> errors = response.errors();
- if(!errors.isempty())
- log.warn( "相関IDでメタデータを取得するときのエラー{}:{}" 、header.correlationid()、errors);
- //有効なノードがない場合はクラスターを更新しないでください...私たちが望むトピックは、まだ存在するプロセスにあるかもしれません
- //作成されたのは、エラーが発生することを意味します 存在するまでノードはありません
- //メタデータ情報が正常に取得される場合
- if(cluster.nodes()。 size ()> 0){
- //メタデータ情報を更新します。
- this.metadata.update (cluster、now);
- }それ以外{
- log.trace( "相関ID {}。" 、header.correlationid())を使用した空のメタデータ応答を無視します。
- this.metadata.failedupdate(now);
- }
- }
- /**
- *指定されたトピックのメタデータリクエストを作成します
- */
- プライベートクライアントレクエストリクエスト(長い間、文字列ノード、メタデータレクストメタデータ){
- requestsend send = new requestsend(node、nextrequestheader(apikeys.metadata)、metadata.tostruct());
- 新しいclientRequestを返します(今、 true 、send、 null 、 true );
- }
- /**
- *送信のリストにメタデータリクエストを追加することができれば、それを作成できる場合は
- */
- private voidupdate(長い間、ノードノード){
- //ノードが使用可能かどうかを確認します
- if(node == null ){
- log.debug( "ノードが利用できないため、メタデータリクエストの送信をあきらめる" );
- //タイムスタンプをマークします のために 利用可能なノードはありません 接続する
- this.lastnodeavailablems = now;
- 戻る;
- }
- string nodeconnectionid = node.idstring();
- //ネットワーク接続を確立する必要があるかどうか、およびノードにリクエストを送信できるかどうかを判断します
- if(cansendrequest(nodeconnectionid)){
- this.metadatafetchinprogress = true ;
- Metadatarequestメタデータエクセスト。
- //メタデータで更新する必要があるトピックを指定します
- if(metadata.needmetadataforalltopics())
- //リクエストのメタデータ情報をカプセル化するリクエスト(すべてのトピックを取得)
- metadatarequest = metadatarequest.alltopics();
- それ以外
- //この方法デフォルトでここに行きました
- //メッセージを送信するのは、対応するトピックを引く方法です
- metadatarequest = new Metadatarequest(new ArrayList <>(metadata.topics()));
- //ここでリクエストを作成します(メタデータをプル)
- ClientRequest ClientRequest = request(Now、nodeconnectionId、metadatarequest);
- log.debug( "metadata request {}をnode {}に送信{}" 、metadatarequest、node.id());
- //キャッシュリクエスト、次回Poll()メソッドがトリガーされ、送信操作が実行されるのを待ちます
- Dosend(ClientRequest、Now);
- } else if(connectionStates.canconnect(nodeconnectionId、now)){
- //接続がありません 今すぐこのノードに、1つを作成します
- log.debug( "メタデータ要求を送信するためにnode {}への接続を初期化" 、node.id());
- //接続を初期化します
- initiateconnect(Now、Now);
- // initiateconnectがすぐに失敗した場合、このノードは停電になり、
- //すぐに再試行する必要があります 別の候補ノードがあります。もしそれが
- //最悪の場合はまだ接続しています より長いタイムアウトを設定することになっているということです
- //次のラウンドで 次に、応答を待ちます。
- } else {//接続、しかしもっと送信したり、接続したりすることはできません
- //どちらの場合でも、選択したことを知らせるためにネットワークイベントを待つ必要があります
- //接続が再び使用できる場合があります。
- this.lastnonodeavailablems = now;
- }
- }
- }
- Inflightrequest CacheキューへのCaches ClientRequestリクエスト。
- private void dosend(clientRequestリクエスト、長い状態){
- request.setsendtimems(now);
- // inflightrequestsキャッシュキューに応答要求はありません。デフォルトでは、最大5つのリクエストを保存できます
- this.inflightrequests。追加(リクエスト);
- //次に、セレクターのsend()メソッドを継続的に呼び出し続けます
- selector.send(request.request());
- }
2.3 Inflightrequestsこのクラスはリクエストキューであり、送信されたが応答を受け取っていないクライアントリケストをキャッシュするために使用されます。キャッシュキューを管理するための多くの方法を提供し、構成パラメーターを介してクライアントリケストの数の制御をサポートし、ソースコードを介してその基礎となるデータ構造マップを見ることができます。 - /**
- *セット 送信された、または送信されているがまだ返信を受けていないリクエストの
- */
- 最終クラスのInflightRequests {
- プライベートファイナルint maxinflightrequestsperconnection;
- プライベート最終マップ<string、deque <clientRequest >> requests = new Hashmap <>();
- public Inflightrequests( int maxinflightrequestsperconnection){
- this.maxinflightrequestsperconnection = maxinflightrequestsperconnection;
- }
- ......
- }
キューの処理に関する多くの方法を含めることに加えて、Cansendmore()に焦点を合わせるためのより重要な方法があります。 - /**
- *このノードにさらにリクエストを送信できますか?
- *
- *問題の@paramノードノード
- * @戻る 特定のノードにまだリクエストが送信されていない場合はtrue
- */
- Public Boolean Cansendmore(String node){
- // clientRequestキューをノードに送信する
- deque <ClientRequest> queue = requests.get(node);
- //ノードに要求が蓄積され、時間内に処理されない場合、リクエストタイムアウトが発生する可能性があります。
- return queue == null || queue.isempty()||
- (queue.peekfirst()。request()。完了()
- && 列。 size ()<this.maxinflightrequestsperconnection);
- }
上記のコアクラスを理解した後、NetworkClientのプロセスと実装の分析を開始します。 2.4 NetworkClient Kafkaのすべてのメッセージは、NetworkClientを介して上流と下流で確立する必要があり、その重要性は自明です。ここでは、メッセージの成功プロセスのみを考慮し、例外処理は解析されず、比較的重要ではありません。メッセージを送信するプロセスは、ほぼ次のとおりです。 1.最初にReady()メソッドを呼び出して、ノードがメッセージを送信する条件を満たしているかどうかを判断します 2。iSready()メソッドを使用して、より多くの要求をノードに送信できるかどうかを判断します。これは、要求の蓄積があるかどうかを確認するために使用されます。 3。初期コネクトを使用して接続を初期化します 4.次に、Selector's Connect()メソッドを呼び出して接続を確立します 5.Socketchannelを入手して、サーバーとの接続を確立します 6. socketchannelのセレクターでop_connectイベントを登録します 7。send()を呼び出してリクエストを送信します 8。リクエストを処理するためにPoll()メソッドを呼び出します 以下は、各プロセスに関連する主要な操作を理解するためのメッセージ送信プロセスに含まれるコアメソッドに基づいた分析です。 1.ノードがメッセージ送信条件を満たしているかどうかを確認します - /**
- *指定されたノードへの接続を開始し、返します すでに接続されていて、そのノードに送信する準備ができている場合はTRUEです。
- *
- * @paramノードにノード チェック
- * @param今すぐに タイムスタンプ
- * @戻る 特定のノードに送信する準備ができている場合はtrue
- */
- @オーバーライド
- public boolean Ready(Node Node、Long Now){
- if(node.isempty())
- 新しいIllegalargumentException( 「空のノードに接続できない」 +ノード)を投げます。
- //メッセージを送信するためのホストがメッセージを送信する条件を満たすかどうかを識別します
- if(isready(node、now))
- 戻る 真実;
- //ネットワークの構築を試みることができるかどうかを判断します
- if(connectionstates.canconnect(node.idstring()、now))
- //ノードへの送信に興味があり、接続がない場合 それに、1つを開始します
- //接続を初期化します
- //イベントに接続するためにバインド
- initiateconnect(Now、Now);
- 戻る 間違い;
- }
2。接続を初期化します - /**
- *接続を開始します 指定されたノードに
- */
- private void initiateConnect(ノードノード、長い){
- string nodeconnectionid = node.idstring();
- 試す {
- log.debug( "{}でnode {}への接続の開始{}:{}。" 、node.id()、node.host()、node.port());
- this.connectionStates.connecting(nodeconnectionId、now);
- //接続の確立を開始します
- セレクタ。 connect (nodeconnectionid、
- new inetsocketAddress(node.host()、node.port())、
- this.socketsendbuffer、
- this.SocketReceiveBuffer);
- } キャッチ (IOException e) {
- / *試行が失敗しました、バックオフ後に再試行します */
- connectionStates.disconnected(nodeconnectionId、now);
- / *問題は私たちのメタデータかもしれません、それを更新*/
- metadataupdater.requestupdate();
- log.debug( "{}でnode {}に接続するエラー{}:{}:" 、node.id()、node.host()、node.port()、e);
- }
- }
3. initiateconnect()メソッドで呼び出される接続()メソッドは、セレクタークラスの選択可能な実装のconnect()メソッドです。 Socketchannelの取得とop_connect、op_read、およびop_writeイベントの登録が含まれます。上記の分析が行われました。ここでは詳細には触れません。上記の一連のアクションを完了してネットワーク接続を確立した後、メッセージ要求はダウンストリームノードに送信されます。送信者のsend()メソッドは、networkclientのsend()メソッドをsendに呼び出し、networkclientのsend()メソッドは最終的にセレクターのsend()メソッドを呼び出します。 - /**
- * 指定されたリクエストを送信キューに入れます。リクエストは送信することしかできません 準備ができたノードに。
- *
- * @param request リクエスト
- * @param今すぐに タイムスタンプ
- */
- @オーバーライド
- public void send(clientRequestリクエスト、長い今){
- string nodeid = request.request()。destination();
- //接続ステータスを確立したノードがより多くのリクエストを受信できるかどうかを識別します
- if(!cansendrequest(nodeid))
- 新しいIllegalStateExceptionを投げます( "Node" + nodeid + "にリクエストを送信しようとする試みができていません。" );
- // clientRequestを送信します
- Dosend(リクエスト、今);
- }
- private void dosend(clientRequestリクエスト、長い状態){
- request.setsendtimems(now);
- //キャッシュリクエスト
- this.inflightrequests。追加(リクエスト);
- selector.send(request.request());
- }
4.最後に、リクエストを処理するためにPoll()メソッドに電話してください - /**
- *ソケットへの実際の読み取りと書き込みを実行します。
- *
- * @paramタイムアウトの最大量 時間 すぐに何もない場合は( MSで)応答を待つために、
- * は負でない値である必要があります。実際のタイムアウトは、タイムアウト、リクエストタイムアウト、および
- * メタデータのタイムアウト
- * @param今すぐに 時間 ミリ秒単位
- * @受信した応答のリストを返します
- */
- @オーバーライド
- パブリックリスト<ClientResponse> Poll(Long Timeout、Long Now){
-
- //ステップ1:メタデータを更新するリクエスト
- long metadatimeout = metadataupdater.maybeupdate(now);
- 試す {
-
- //ステップ2:I/O操作を実行してリクエストを送信します
- this.selector.poll( utils。min (timeout、metadatimeout、request timeoutms));
- } キャッチ (IOException e) {
- log.Error( "I/Oの間の予期しないエラー" 、e);
- }
- // 完了したアクションを処理する
- long updatednow = this。 time .milliseconds();
- リスト<ClientResponse> responses = new ArrayList <>();
- //ステップ3:さまざまな応答を処理します
- HandleCompletedSends(Responses、updatedNow);
- HandleCompletedReceives(Responses、updatedNow);
- hondledisconnections(responses、updatednow);
- handleconnections();
- handletimedoutrequests(responses、updatednow);
- //コールバックを呼び出します
- // loop Call clientRequestのコールバック関数を呼び出します
- for (clientResponse応答:応答){
- if(respons.request()。hascallback()){
- 試す {
- Response.Request()。callback()。oncomplete(response);
- } キャッチ (例外 e) {
- log.Error( "要求完了時の有名なエラー:" 、e);
- }
- }
- }
- 返信応答。
- }
興味のある子供の靴は、コールバック関数のロジックとセレクターの動作を引き続き探求できます。 追加メモ: Kafkaのメモリプールは含まれない上記のポイントがあります。 BufferPoolクラスを見ることができます。この知識ポイントは、前の記事で言及する必要があります。私は突然それを思い出し、それを逃しました。ここに追加します。これは、前述の記録的な蓄積クラスのデータ構造に対応しています。カプセル化されたレコードカミュレータオブジェクトは複数のdqueueで構成され、各dqueuは複数のレコードバッチで構成されています。さらに、RecordAccumulatorにはBufferPoolメモリプールも含まれています。 RecordAccumulatorクラスがConcurrentMapを初期化することをここで少し思い出しましょう。 - パブリックファイナルクラスRecordAccumulator {
- ......
- プライベートファイナルバッファプール無料;
- ......
- プライベート最終的なCONCURRENTMAP <TOPPITPARTITION、deque <RecordBatch >>バッチ;
- }
図に示すように、メモリ内のarecate()とリリースdeallocate()の2つの方法に焦点を当てます。興味のある友達は個人的にそれを見ることができます。このクラスのコードは合計で300行を超えており、コンテンツはそれほど多くありません。一緒にコミュニケーションして学ぶことへようこそ。この記事のトピックに影響を与えないように、ここでは拡張しません。 3. まとめこの記事では、主に、メッセージを送信するKafkaプロデューサーの実際の執行者である送信者スレッドと、メッセージの上流および下流の伝送チャネルであるNetworkClientコンポーネントを分析します。主にNIOの適用を伴い、主にメッセージの送信を伴うコア依存関係クラスも導入します。この記事を書くことは、主に過去と未来の間のリンクとして機能します。これは、Kafkaの生産者から送信されたメッセージの以前の分析の補足であるだけでなく、上流の消費者消費の次の分析への道を開くこともできます。それは少し体系的です。記事を書くというアイデアは、主に、合計スコアのアイデアを分析する手がかりとして考慮しています。個人的には、長さが長すぎて読むのが不便だと思うので、読者に役立つことを望んで、それを合理化し、コアの方法とプロセスの分析に集中しようとします。 |