みなさんこんにちは。Hua Zaiです。またお会いできて嬉しいです。 今日は主に「Kafka クライアント メッセージ キャッシュ アーキテクチャ設計」について説明し、メッセージがどのようにキャッシュされるかを詳しく分析します。 この記事を注意深く読むと、Kafka クライアント キャッシュ アーキテクチャのソース コードに対する理解が深まると思います。 I. 全体概要シナリオ駆動型アプローチでは、送信されたメッセージがネットワーク要求を通じてカプセル化されると、NIO マルチプレクサはネットワークの読み取りおよび書き込みイベントをリッスンし、ネットワーク メッセージ送受信を行います。クライアント上でメッセージがどのようにキャッシュされるかを見てみましょう。 Kafka が超高スループットのメッセージング システムであることは誰もが知っていますが、それは主に「非同期送信」、「バッチ送信」、「メッセージ圧縮」に反映されています。 この記事に関連するのは「バッチ送信」です。つまり、プロデューサーがメッセージをキャッシュし、特定の条件が満たされると、送信者サブスレッドがメッセージをバッチで Kafka ブローカーに送信します。 これの利点は、「ネットワーク要求の数を最小限に抑え、ネットワーク スループットを向上させる」ことです。 誰もが理解しやすいように、すべてのソースコードの骨組みのみが保持されます。 2. メッセージはクライアント側でどのようにキャッシュされますか?バッチで送信されるため、メッセージをキャッシュする必要があります。では、メッセージはどこにキャッシュされるのでしょうか?どのように管理されていますか? 以下の簡略化されたフローチャートからわかるように、送信されるメッセージは主に RecordAccumulator にキャッシュされます。 実際のシナリオに例えて説明すると、理解しやすくなります。 RecordAccumulator はメッセージを蓄積する倉庫のようなものなので、宅配便の倉庫の例えを使ってみましょう。 上の写真は商品が満載の急行倉庫です。仕分け担当者が、さまざまな目的地の荷物を目的地に対応する貨物箱に入れ、いっぱいになった箱を対応するエリアに置いている様子を見ることができます。 次に、ソーターは RecordAccumulator を参照し、ボックスとそれぞれのスタッキング領域は、RecordAccumulator 内でメッセージがキャッシュされる場所です。封印された箱はすべて、送り主が受け取って発送するまで待機します。 上の図を理解すれば、RecordAccumulator のアーキテクチャ設計と動作ロジックを大まかに理解できるようになります。 倉庫にあるものを要約すると次のようになります。 - ソーター
- 品
- 行き先
- カーゴボックス
- スタッキングエリア
これらの概念を覚えておいてください。これらはソース コードに反映されます。プロセスは次の図に示されています。 上の図からわかるように、 - 少なくとも 1 つのビジネス メイン スレッドと 1 つの送信側スレッドが RecordAccumulator を同時に操作するため、スレッドセーフである必要があります。
- その中には、ConcurrentMap コレクション「 Kafka カスタマイズされた CopyOnWriteMap 」があります。キー: TopicPartiton、値: Deque<ProducerBatch>、つまり、トピック パーティションを単位として、メッセージは ProducerBatch に蓄積されキャッシュされ、複数の ProducerBatch が Deque キューに格納されます。 Deque 内の最新のバッチがメッセージを収容できない場合、キャッシュを続行するために新しいバッチが作成され、Deque に追加されます。
- Kafka は、メモリ破壊の頻繁な適用によって発生する Full GC 問題を軽減するために、ProducerBatch を介してデータをキャッシュする古典的な「 BufferPool メカニズム」を設計しています。
要約すると、RecordAccumulator クラスには、「ProducerBatch」、「Custom CopyOnWriteMap」、および「BufferPool メカニズム」という 3 つの重要なコンポーネントがあることがわかります。 スペースの制約により、RecordAccumulator クラスについては次の記事で説明します。 まず、メッセージのキャッシュと送信の最小単位である ProducerBatch について見てみましょう。 呼び出し関係から、ProducerBatch は MemoryRecordsBuilder に依存し、MemoryRecordsBuilder は MemoryRecords に依存していることがわかります。つまり、「MemoryRecords はメッセージが実際に保存される場所です」。 1. メモリレコード java をインポートします。 ニオ。 バイトバッファ; パブリッククラスMemoryRecords はAbstractRecords を拡張します{ パブリック静的MemoryRecordsBuilder ビルダー(..){
リターンビルダー(...) }
パブリック静的MemoryRecordsBuilder ビルダー( ByteBuffer バッファ、 バイトマジック、 圧縮タイプ圧縮タイプ、 タイムスタンプタイプタイムスタンプタイプ、 長いベースオフセット、 長いlogAppendTime 、 長いプロデューサーID 、 ショートプロデューサーエポック、 int ベースシーケンス、 ブール値isTransactional 、 ブール値isControlBatch 、 int パーティションリーダーエポック) {
新しいMemoryRecordsBuilder (...) を返します。 } } このクラスは比較的簡単です。ビルダー メソッドから、メッセージの保存に ByteBuffer に依存していることがわかります。 MemoryRecordsBuilder クラスの構築は、MemoryRecords.builder() を通じて初期化されます。 MemoryRecordsBuilder クラスの実装を見てみましょう。 2. メモリレコードビルダー パブリッククラスMemoryRecordsBuilderはAutoCloseable を実装します{ プライベート静的最終DataOutputStream CLOSED_STREAM = 新しいDataOutputStream ( 新しいOutputStream () {
パブリックvoid 書き込み( int b ){ 新しいものを投げる; } }); プライベート最終TimestampType timestampType ; プライベート最終圧縮タイプ圧縮タイプ; プライベート最終ByteBufferOutputStream バッファストリーム; プライベート最終バイトマジック; プライベート最終int 初期位置; プライベート最終long baseOffset ; プライベート最終長いlogAppendTime ; プライベート最終ブール値isControlBatch ; プライベート最終int パーティションリーダーエポック; プライベート最終int 書き込み制限; プライベート最終int バッチヘッダーサイズInBytes ; プライベートfloat 推定圧縮率= 1.0F ; プライベートDataOutputStream appendStream ; プライベートブール値isTransactional ; プライベートな長いプロデューサーID ; プライベートショートプロデューサーEpoch ; プライベートint ベースシーケンス; プライベートint 非圧縮レコードサイズInバイト= 0 ; プライベートint numRecords = 0 ; プライベートfloat 実際の圧縮率= 1 ; プライベートlong maxTimestamp = RecordBatch 。 タイムスタンプなし; プライベートロングオフセットOfMaxTimestamp = - 1 ; プライベートLong lastOffset = null ; プライベートLong firstTimestamp = null ; プライベートMemoryRecords 構築済み Records ; このカテゴリには多くのフィールドがありますが、ここではバイト ストリームに関する 2 つのフィールドについてのみ説明します。 - CLOSED_STREAM : ByteBuffer が閉じられると、対応する書き込み出力ストリームも CLOSED_STREAM に設定されます。目的は、データが ByteBuffer に書き込まれるのを防ぐことです。それ以外の場合は例外がスローされます。
- bufferStream : まず、MemoryRecordsBuilder はメッセージの保存を完了するために ByteBuffer に依存します。 ByteBuffer を ByteBufferOutputStream にカプセル化し、Java NIO の OutputStream を実装して、データをストリーム形式で書き込むことができるようにします。同時に、 ByteBufferOutputStream はByteBuffer を自動的に拡張する機能を提供します。
初期化構築方法を見てみましょう。 パブリックMemoryRecordsBuilder ( ByteBuffer バッファ、...) { this ( 新しいByteBufferOutputStream ( バッファ)、... )。 } パブリックメモリレコードビルダー( ByteBufferOutputStream バッファストリーム、 ... int 書き込み制限) { .... これ。 初期位置= bufferStream 。 位置(); これ。 batchHeaderSizeInBytes = AbstractRecords 。 recordBatchHeaderSizeInBytes ( マジック、 圧縮タイプ); バッファストリーム。 位置( 初期位置+ バッチヘッダーサイズ (バイト単位) ); これ。 バッファストリーム= バッファストリーム; これ。 appendStream = 新しいDataOutputStream ( compressionType.wrapForOutput ( this.bufferStream , magic )); } } コンストラクターからわかるように、基本フィールドの割り当てに加えて、次の 3 つのことが行われます。 - バッチ ヘッダーのサイズと長さは、メッセージのバージョンと圧縮タイプに基づいて計算されます。
- bufferStream の位置を調整してBatch ヘッダーの位置をスキップすることで、メッセージを直接書き込むことができます。
- bufferStream に圧縮関数を追加します。
これを見ると非常に興味深いです。読者は、ここで「ByteBuffer」、「bufferStream」、および「appendStream」が関係していることに気付いているでしょうか。 3 つの関係は、「デコレータ パターン」を通じて実現されます。つまり、bufferStream は ByteBuffer を装飾して拡張機能を実現し、appendStream は bufferStream を装飾して圧縮機能を実現します。 そのコアメソッドを見てみましょう。 (1) オフセット付き追加() public Long append ( long タイムスタンプ、 ByteBuffer キー、 ByteBuffer 値、 Header [] ヘッダー) { appendWithOffset ( nextSequentialOffset ()、 タイムスタンプ、 キー、 値、 ヘッダー) を返します。 } プライベートロングnextSequentialOffset (){ lastOffset == null を返しますか? ベースオフセット: ラストオフセット+ 1 ; } プライベートLong appendWithOffset ( ロングオフセット、 ブール型isControlRecord 、 長いタイムスタンプ、 ByteBuffer キー、 ByteBuffer 値、 ヘッダー[] ヘッダー) { 試す{ if ( isControlRecord != isControlBatch ) 新しいものを投げる; ( lastOffset != null && offset <= lastOffset ) の場合 新しいものを投げる; if ( タイムスタンプ< 0 && タイムスタンプ!= RecordBatch . NO_TIMESTAMP ) 新しいものを投げる; if ( magic < RecordBatch . MAGIC_VALUE_V2 && headers != null && headers . length > 0 ) 新しいものを投げる; if ( firstTimestamp == null ) firstTimestamp = タイムスタンプ; if ( magic > RecordBatch . MAGIC_VALUE_V1 ) { appendDefaultRecord ( オフセット、 タイムスタンプ、 キー、 値、 ヘッダー); null を返します。 } それ以外{ appendLegacyRecord ( オフセット、 タイムスタンプ、 キー、 値、 マジック) を返します。 } } IOException をキャッチします。
} } このメソッドは主にオフセットに従ってメッセージを追加するために使用され、メッセージのバージョンに従って対応するメッセージを書き込みます。ただし、ProducerBatch はバージョン V2 に対応していることは明らかです。 V2 バージョンのメッセージ書き込みロジックを見てみましょう。 プライベートvoid appendDefaultRecord ( ロングオフセット、 長いタイムスタンプ、 ByteBuffer キー、 ByteBuffer 値、 ヘッダー[] ヘッダー) はIOException をスローします{ レコードの追加のために開くことを確認します(); int offsetDelta = ( int ) ( オフセット- baseOffset ); long timestampDelta = timestamp - firstTimestamp ; int sizeInBytes = デフォルトレコード。 writeTo ( appendStream 、 offsetDelta 、 timestampDelta 、 key 、 value 、 headers ); recordWritten ( オフセット、 タイムスタンプ、 サイズ (バイト単位) ); } プライベートvoidensureOpenForRecordAppend ( ) { ( appendStream == CLOSED_STREAM ) の場合 新しいものを投げる; } private void recordWritten ( long offset , long timestamp , int size ) { .... レコード数+= 1 ; 非圧縮レコードサイズ(バイト数) += サイズ; lastOffset = オフセット; if ( magic > RecordBatch . MAGIC_VALUE_V0 && timestamp > maxTimestamp ) { maxTimestamp = タイムスタンプ; offsetOfMaxTimestamp = オフセット; } } このメソッドは主に V2 バージョン メッセージを書き込むために使用され、主に次の 5 つのことを行います。 - 書き込み可能かどうかを確認します。appendStream の状態が CLOSED_STREAM であるかどうかを判断します。そうでない場合は書き込み可能であり、そうでない場合は例外がスローされます。
- 今回どれだけのオフセットを書き込むか計算します。
- この書き込みと最初の書き込みの間の時間差を計算します。
- メッセージを V2 形式で appendStream ストリームに書き込み、圧縮前のメッセージのサイズを返します。
- 成功した場合、 RecordBatch のメタデータが更新されます。
(2) 部屋がある() public boolean hasRoomFor ( long タイムスタンプ、 ByteBuffer キー、 ByteBuffer 値、 Header [] ヘッダー) { if ( isFull ()) の場合 false を返します。 レコード数== 0 の場合 true を返します。 最終的なint レコードサイズ; if ( magic < RecordBatch . MAGIC_VALUE_V2 ) { recordSize = レコード数。 LOG_OVERHEAD + レガシーレコード。 recordSize ( マジック、 キー、 値); } それ以外{ int nextOffsetDelta = lastOffset == null ? 0 : ( int ) ( lastOffset -baseOffset + 1 ); ... recordSize = デフォルトレコード。 sizeInBytes ( nextOffsetDelta 、 timestampDelta 、 key 、 value 、 headers ); } これを返します。 writeLimit >= estimatedBytesWritten () + recordSize ; } パブリックブールisFull () { appendStream == CLOSED_STREAM を返します|| ( this . numRecords > 0 && this . writeLimit <= EstimateBytesWritten ()); } このメソッドは主に、現在の MemoryRecordsBuilder に書き込むレコードを収容するスペースがあるかどうかを推定するために使用され、以下の ProducerBatch.tryAppend() で呼び出されます。 最後に、冒頭で触れた自動拡張機能について見てみましょう。 (3) バッファ拡張() パブリッククラスByteBufferOutputStreamはOutputStream を拡張します{ プライベート静的最終浮動小数点REALLOCATION_FACTOR = 1.1f ; プライベート最終int 初期容量; プライベート最終int 初期位置; パブリックvoid 残りを確保( int 残りバイト数が必要 ) { if ( 残りのバイト数が必要> バッファ. 残りの()) expandBuffer ( 残りのバイト数が必要); } プライベートvoid expandBuffer ( int 残りの必要数) { int expandSize = Math 。 max (( int ) ( buffer . limit () * REALLOCATION_FACTOR ), buffer . position () + residualRequired ); ByteBuffer temp = ByteBuffer 。 割り当てる( expandSize ); 制限= 制限() ; バッファ。 フリップ(); 温度。 put ( バッファ); バッファ。 制限( 制限); バッファ。 位置( 初期位置); バッファ= temp ; } } このメソッドは主に、ByteBuffer を拡張する必要があるかどうかを判断するために使用されます。つまり、書き込まれたバイト数がバッファ内に残っている現在のバイト数よりも大きい場合、拡張が開始されます。拡張には次の 3 つが必要です。 - 必要なスペースの量を評価する: 「拡張スペース」と「実際に必要なバイト数」の間の最大値を取得します。ここで「拡張係数」が計算に使用されるのは、主に拡張にはシステム リソースの消費が必要になるためです。毎回実際のデータ サイズに応じてスペースを割り当てると、不要なシステム リソースが浪費されます。
- 新しいスペースを申請する: 拡張量に基づいて新しい ByteBuffer を申請し、元の ByteBuffer データをそこにコピーします。対応するソースコードのステップ:「 3 - 7 」。
- 最後に、参照は新しく適用された ByteBuffer を指します。
次に、ProducerBatch の実装を見てみましょう。 3. プロデューサーバッチ パブリックファイナルクラスProducerBatch { プライベート列挙型FinalState { ABORTED 、 FAILED 、 SUCCEEDED } 最終的な長いcreatedMs ; 最終的なトピックパーティションtopicPartition ; 最終的なProduceRequestResult を produceFuture します。 プライベート最終リスト< Thunk > thunks = new ArrayList <> (); プライベート最終MemoryRecordsBuilder recordsBuilder ; プライベート最終AtomicInteger 試行= 新しいAtomicInteger ( 0 ); プライベート最終ブール値isSplitBatch ; プライベート最終AtomicReference < FinalState > finalState = new AtomicReference <> ( null ); int レコード数; 最大レコードサイズ; プライベート長い最後の試みMs; プライベート長いlastAppendTime ; プライベートロングドレインMs; プライベートブール再試行; } パブリックProducerBatch ( TopicPartition tp 、 MemoryRecordsBuilder recordsBuilder 、 long createdMs 、 boolean isSplitBatch ) { ... これ。 produceFuture = 新しいProduceRequestResult ( topicPartition ); ... } ProducerBatch は、通常「バッチ メッセージ」と呼ばれる 1 つ以上のメッセージを格納します。 いくつかの重要なフィールドを見てみましょう。 - topicPartition : バッチに対応するトピック パーティション。現在の ProducerBatch にキャッシュされているすべてのレコードがこの TopicPartition に送信されます。
- produceFuture : ProduceRequestResult クラスによって実装される、リクエスト結果の Future。
- thunks : 各レコードに関連付けられたメッセージ コールバックと機能応答データを格納するために使用される Thunk オブジェクトのコレクション。
- recordsBuilder : MemoryRecords オブジェクトをカプセル化し、メッセージの ByteBuffer を保存するために使用されます。
- 試行回数: バッチの失敗した再試行回数。 AtomicInteger は、Integer を使用するためのアトミック操作を提供し、高並行性が必要な状況での使用に適しています。
- isSplitBatch : 分割バッチであるかどうか。これは、単一のメッセージが 1 つの ProducerBatch に格納するには大きすぎるため、複数の ProducerBatch に分割されて格納される場合です。
- dawnedMs : Sender 子スレッドがバッチをプルするのにかかる時間。
- retry : ProducerBatch 内のデータの送信に失敗した場合、再度送信を試みます。
コンストラクタでは、重要な依存コンポーネントとして「メッセージ生成結果を非同期に取得するためのクラス」である「ProduceRequestResult」があります。簡単に分析してみましょう。 (1) ProduceRequestResultクラスパブリッククラス ProduceRequestResult { プライベートファイナルCountDownLatchラッチ = 新しいCountDownLatch(1); プライベート最終トピックパーティショントピックパーティション; プライベート volatile Long baseOffset = null; パブリック ProduceRequestResult(トピックパーティション topicPartition) { this.topicPartition = トピックパーティション; } パブリックvoid done() { (baseOffset == null)の場合 新しいものを投げる; this.latch.countDown(); } パブリックvoid await()はInterruptedExceptionをスローします{ ラッチを待機します。 } パブリックブールawait(long timeout, TimeUnit unit) throws InterruptedException { 戻り値:latch.await(timeout, unit); } } このクラスは、CountDownLatch(1) を通じて Future 関数を間接的に実装し、他のすべてのスレッドがこのロックを待機するようにします。現時点では、他のすべての待機中のスレッドの実行を同時に再開するには、countDown() メソッドを 1 回呼び出すだけで済みます。 Producer がメッセージを送信すると、間接的に "ProduceRequestResult.await" が呼び出され、スレッドはサーバーからの応答を待機します。サーバーが応答すると、「ProduceRequestResult.done」が呼び出され、「CountDownLatch.countDown」が呼び出され、「CountDownLatch.await」でブロックされているメインスレッドが起動します。これらのスレッドは、ProduceRequestResult のエラー フィールドを使用して、要求が成功したか失敗したかを判断できます。 次に、ProducerBatch クラスの重要なメソッドを見てみましょう。 (2) トライアペンド() public FutureRecordMetadata tryAppend ( long timestamp 、 byte [] key 、 byte [] value 、 Header [] headers 、 Callback callback 、 long now ) { if ( ! recordsBuilder.hasRoomFor ( タイムスタンプ、 キー、 値、 ヘッダー)) { null を返します。 } それ以外{ 長いチェックサム= this 。 レコードビルダー。 append ( タイムスタンプ、 キー、 値、 ヘッダー); これ。 maxRecordSize = 数学. max ( this . maxRecordSize 、 AbstractRecords . estimateSizeInBytesUpperBound ( magic ()、 recordsBuilder . compressionType ()、 key 、 value 、 headers )); ... FutureRecordMetadata future = new FutureRecordMetadata ( this.produceFuture 、 this.recordCount 、 timestamp 、 checksum 、 key == null ? - 1 : key.length 、 value == null ? - 1 : value.length 、 Time.SYSTEM ) ; ドスン。 add ( 新しいThunk ( コールバック、 未来)); これ。 レコード数++ ; 将来を返します。 } } このメソッドは主にメッセージを追加するために使用され、主に次の 6 つのことを行います。 - MemoryRecordsBuilder の hasRoomFor() を使用して、現在の ProducerBatch に今回書き込まれたレコードを格納するのに十分なスペースがあるかどうかを確認します。
- MemoryRecordsBuilder.append() メソッドを呼び出して、 Record を ByteBuffer に追加します。
- このレコードの送信に対応する、最下層の Future インターフェースを継承するFutureRecordMetadata オブジェクトを作成します。
- Future とメッセージ コールバックを Thunk オブジェクトにカプセル化し、それを thunks コレクションに配置します。
- レコード番号を更新します。
- FutureRecordMetadata を返します。
このメソッドでは、プロデューサーのメイン スレッドがメッセージ キャッシュを完了することしかできず、実際のネットワーク送信は実装されないことがわかります。 次に、JDK で並行 Future インターフェースを実装する FutureRecordMetadata を簡単に見てみましょう。 ProduceRequestResult オブジェクトを維持することに加えて、relativeOffset などのフィールドも維持します。relativeOffset は、ProducerBatch 内の対応する Record のオフセットを記録するために使用されます。 このクラスには、get() と value() という 2 つの注目すべきメソッドがあります。 パブリックRecordMetadata get ( long timeout 、 TimeUnit unit ) は、 InterruptedException 、 ExecutionException 、 TimeoutException をスローします{ ... ブール値が発生しました= this 。 結果。 await ( タイムアウト, 単位); ... 戻り値またはエラー(); } RecordMetadata valueOrError () がExecutionException をスローします{ ... 戻り値(); } このメソッドは主に ProduceRequestResult の CountDown に依存してブロッキング待機を実装し、最後に value() を呼び出して RecordMetadata オブジェクトを返します。 レコードメタデータ値(){ ... 新しいRecordMetadata を返す( 結果。 トピックパーティション(), ...); } プライベートロングタイムスタンプ(){ 結果を返します。 LogAppendTime () を持っていますか? 結果。 logAppendTime (): タイムスタンプを作成します。 } このメソッドは主に、さまざまなパラメータを RecordMetadata オブジェクトにカプセル化して返します。 ProducerBatch がデータを書き込む方法がわかったので、done() メソッドを見てみましょう。プロデューサーがブローカーから「正常」などの応答を受信した場合 | 「タイムアウト」| 「異常」 | 「プロデューサーを閉じる」と、ProducerBatch の done() メソッドが呼び出されます。 (3) 完了() パブリックブール値done ( long baseOffset 、 long logAppendTime 、 RuntimeException 例外) { 最終的なFinalState tryFinalState = ( 例外== null ) ? 最終状態。 成功しました: FinalState 。 失敗した; .... if ( this . finalState . compareAndSet ( null 、 tryFinalState )) { completeFutureAndFireCallbacks ( baseOffset 、 logAppendTime 、 exception ); true を返します。 } .... false を返します。 } このメソッドは主に、コールバック操作を実行できるかどうかを判断するために使用されます。つまり、バッチ応答を受信した後、バッチの最終ステータスに基づいてコールバック操作を実行できるかどうかを判断します。 (4) 完全なFutureAndFireCallbacks() プライベートvoid completeFutureAndFireCallbacks ( long baseOffset 、 long logAppendTime 、 RuntimeException 例外) { 未来を創る。 ( baseOffset 、 logAppendTime 、 exception ) を設定します。 for ( サンクサンク: サンク) { 試す{ if ( 例外== null ) { RecordMetadata メタデータ= サンク。 未来。 価値(); if ( thunk . callback != null ) ドスン。 折り返し電話。 onCompletion ( メタデータ、 null ); } それ以外{ if ( thunk . callback != null ) ドスン。 折り返し電話。 onCompletion ( null 、 例外); } } .... } 未来を創る。 終わり(); } このメソッドは主にコールバック メソッドを呼び出して future を完了するために使用され、主に次の 3 つのことを行います。 - 基本的な変位、メッセージ追加時間、例外など、ProduceRequestResult の関連フィールドを更新します。
- thunks コレクションを走査し、各レコードの Callback コールバックをトリガーします。
- 基になる CountDownLatch.countDown() メソッドを呼び出し、メイン スレッドをブロックします。
これまで、ProducerBatch の最も重要な 3 つの方法、「メッセージをキャッシュする方法」、「応答を処理する方法」、「コールバックを処理する方法」について説明しました。 キャッシュされたメッセージの保存構造を図で説明してみましょう。 次に、Kafka のプロダクション側で最も古典的な「バッファ プール アーキテクチャ」を見てみましょう。 3. クライアント キャッシュ プール アーキテクチャ設計クライアントにキャッシュ プールの従来のアーキテクチャ設計が必要なのはなぜですか? 主な理由は、ProducerBatch を頻繁に作成してリリースすると Full GC の問題が発生するため、Kafka ではこの問題に対する非常に優れたメカニズムである「バッファ プール BufferPool メカニズム」を実装していることです。つまり、各バッチの最下層は、メッセージを格納するために特別に使用され、使用後に返すことができるメモリ空間の一部に対応します。 次に、キャッシュプールのソースコード設計を見てみましょう。 1. バッファプール x 1パブリッククラス BufferPool { 2 プライベート最終長い合計メモリ; 34 プライベート最終intプール可能なサイズ; 56 プライベート最終ReentrantLockロック; 78 プライベート最終 Deque<ByteBuffer> を解放します。 910 プライベート最終 Deque<Condition> waiters; 1112 プライベート long nonPooledAvailableMemory; 1314 パブリック BufferPool(long メモリ、int poolableSize、メトリック metrics、Time time、String metricGrpName) { 15 ... 1617 this.totalMemory = メモリ; 1819 this.nonPooledAvailableMemory = メモリ; 20 } 21 } 上記の重要なフィールドを見てみましょう。 - totalMemory : BufferPool 全体のメモリ サイズ " buffer.memory "。デフォルトは 32M です。
- poolableSize : プールされたキャッシュ プール内のメモリ ブロックのサイズ ( batch.size )。デフォルトは 16k です。
- lock : 複数のスレッドが同時に ByteBuffer を割り当ててリサイクルする場合、スレッドの安全性を確保するためにロックを使用して同時実行を制御します。
- free : 指定されたサイズの ByteBuffer オブジェクトをキャッシュするプールされた空きキュー。
- waiters : ブロックされたスレッドに対応する条件キュー。スレッドが十分なメモリを適用できない場合、そのスレッドはブロックされ、他のスレッドがメモリを解放するまで待機します。対応する Condition オブジェクトがキューに入ります。
- nonPooledAvailableMemory : プールされていない使用可能なメモリ。
固定サイズ「poolableSize 16k」のByteBufferのみを管理していることがわかります。 ArrayDeque の初期化サイズは 16 です。この時点で、BufferPool の状態は次のようになります。 次に、BufferPool の重要なメソッドを見てみましょう。 (1) 割り当てる() パブリックByteBuffer 割り当て( int サイズ、 long maxTimeToBlockMs )はInterruptedException をスローします{ if ( サイズ> this . totalMemory ) throw new IllegalArgumentException ( "" + size + " バイトを割り当てようとしましたが、 メモリ割り当てには" + this . totalMemory + " というハード制限があります。" ); ByteBuffer バッファ= null ; これ。 ロック。 ロック(); if ( this . closed ) { これ。 ロック。 ロックを解除します(); 新しいKafkaException をスローします( "メモリの割り当て中にプロデューサーが閉じられました" ); } .... 試す{ if ( size == poolableSize && ! this . free . isEmpty ()) これを返します。 無料。 ポーリングファースト(); int freeListSize = freeSize () * これ。 プール可能なサイズ;
if ( this . nonPooledAvailableMemory + freeListSize >= size ) { freeUp ( サイズ); this .nonPooledAvailableMemory -= サイズ; } それ以外{ int 累積= 0 ; 条件moreMemory = this 。 ロック。 新しい条件(); 試す{ 残りのTimeToBlockNs = TimeUnit が長いです。 ミリ秒。 ナノ秒数( ブロック時間の最大数) これ。 ウェイター。最後にメモリを追加します。 while ( 累積< サイズ) { .... 試す{ 待機時間経過= ! より多くのメモリ。 await ( 残りのTimeToBlockNs 、 TimeUnit . NANOSECONDS ); ついに .... } .... if ( accumulated == 0 && size == this.poolableSize && ! this.free.isEmpty ( ) ) { バッファ= this 。 無料。 ポーリングファースト(); 累積= サイズ; } それ以外{ freeUp ( サイズ- 累積); int got = ( int ) 数学. min ( サイズ- 累積、 this . nonPooledAvailableMemory ); this .nonPooledAvailableMemory -= 取得しました; 蓄積+= 得た; } } 累積= 0 ; ついに this .nonPooledAvailableMemory += 蓄積されます; これ。 ウェイター。 削除( moreMemory ); } } ついに 試す{ if ( ! ( this . nonPooledAvailableMemory == 0 && this . free . isEmpty ()) && ! this . waiters . isEmpty ())
これ。 ウェイター。 peekFirst ()。 シグナル(); ついに
ロック。 ロックを解除します(); } } if ( バッファ== null ) safeAllocateByteBuffer ( サイズ) を返します。 それ以外 バッファを返します。 } プライベートByteBuffer safeAllocateByteBuffer ( int size ) { ブールエラー= true ; 試す{ ByteBuffer バッファ= allocateByteBuffer ( サイズ); エラー= false ; バッファを返します。 ついに if ( エラー) { これ。 ロック。 ロック(); 試す{ this .nonPooledAvailableMemory += サイズ; if ( ! this . waiters . isEmpty ())
これ。 ウェイター。 peekFirst ()。 シグナル(); ついに
これ。 ロック。 ロックを解除します(); } } } } 保護されたByteBuffer allocateByteBuffer ( int size ) { ByteBuffer を返します。 割り当てる( サイズ); } プライベートvoid freeUp ( int サイズ){ while ( ! this . free . isEmpty () && this . nonPooledAvailableMemory < size ) これ。 nonPooledAvailableMemory += これ。 無料。 pollLast () を実行します。 容量(); } このメソッドは主に ByteBuffer を割り当てるために使用されます。ここに4つのケースがあります: ケース1: 16kを申請し、空きキャッシュプールに利用可能なメモリがあるこのとき、キューの先頭にある ByteBuffer は、空きバッファ プールから直接取得され、割り当てられて使用されます。使用後、ByteBuffer は空きバッファ プールのキューの末尾に直接配置され、次回再利用できるように clear() が呼び出されてデータがクリアされます。 ケース2: 16k を要求しているが、空きキャッシュ プールに空きメモリがない現時点では、空きバッファ プールに使用可能なメモリはなく、プールされていない使用可能なメモリからのみ 16k のメモリを割り当てることができます。使用後、ByteBuffer は空きバッファ プールの末尾に直接配置され、次回再利用できるように clear() が呼び出されてデータがクリアされます。 ケース3: 16k以外のサイズを要求し、空きキャッシュプールに空きメモリがない現時点では、空きキャッシュ プールに使用可能なメモリがなく、要求された量は 16k ではありません。割り当てのために、プールされていない使用可能なメモリ (割り当てに十分なスペース) からメモリの一部のみを取得できます。使用後、要求されたメモリ領域はプールされていない使用可能なメモリに直接解放され、後で GC されます。 ケース4: 16k以外の空きキャッシュプールを要求し、使用可能なメモリがあるが、プールされていない使用可能なメモリが不足している現時点では、空きキャッシュ プールには使用可能なメモリがありますが、要求されたサイズは 16k ではありません。まず、要求されたメモリ サイズが満たされるまで、空きキャッシュ プールからプールされていない使用可能なメモリに ByteBuffer を解放しようとします。次に、割り当て可能な非プールメモリから対応するメモリ サイズを取得します。使用後は、要求されたメモリ領域をプールされていない使用可能なメモリに直接解放し、後で GC されます。 (2)割り当て解除() パブリックvoid deallocate ( ByteBuffer バッファ、 int サイズ) { ロック。 ロック(); 試す{ if ( size == this.poolableSize && size == buffer.capacity () ) { バッファ。 クリア(); これ。 無料。 ( バッファ) を追加します。 } それ以外{ this .nonPooledAvailableMemory += サイズ; } 条件moreMem = this 。 ウェイター。 ピークファースト(); ( moreMem != null ) の場合 もっと見るシグナル(); ついに ロック。 ロックを解除します(); } } このメソッドは主に ByteBuffer スペースを解放するために使用され、主に次の処理が行われます。 - スレッドの安全性を確保するために最初にロックします。
- 解放するサイズが 16k の場合、直接フリー キューに入れられます。
- そうでない場合、JVM GC は ByteBuffer を再利用し、nonPooledAvailableMemory を増加させます。
- ByteBuffer がリサイクルされると、ウェイター内の最初のブロックされたスレッドを起動します。
最後に、「読み取りと書き込みの分離シナリオ」をサポートする、Kafka の CopyOnWriteMap のカスタム実装を見てみましょう。 2. コピーオンライトマップRecordAccumulator クラスのプロパティ フィールドから、CopyOnWriteMap のキーはトピック パーティションであり、値はこのパーティションに送信された Deque<ProducerBatch> キュー コレクションであることがわかります。 メッセージを生成するときに、送信されるパーティションはほとんど変更されないため、書き込み操作はほとんど発生しないことがわかっています。ほとんどの場合、パーティションに対応するキューが最初に取得され、次に ProducerBatch がキューの最後に配置されるため、読み取り操作が非常に頻繁に行われます。これは典型的な「もっと読んで、もっと少なく書く」シナリオです。 いわゆる「CopyOnWrite」とは、書き込み時にコピーが作成され、書き込み後に元のコレクションが置き換えられることを意味します。 ソースコードの実装を見てみましょう。 パブリッククラスCopyOnWriteMap < K , V >はConcurrentMap < K , V > を実装します。 プライベートvolatile Map < K , V > マップ; パブリックCopyOnWriteMap (){ これ。 map = コレクション。 空のマップ(); } このクラスには、"volatile" によって変更される重要なフィールド Map が 1 つだけあります。目的は、マルチスレッドのシナリオで Map が変更されたときに、他のスレッドが見えるようになることです。 次に、比較的単純ですが、実装が非常に古典的ないくつかの重要な方法を見てみましょう。 (1) 取得() パブリックV get ( オブジェクトk ) { マップを返します。 取得( k ); } このメソッドは主にコレクション内のキューを読み取るために使用されます。読み取り操作がロックされず、マルチスレッドの同時読み取りシナリオがブロックされず、高い同時読み取りを実現できることがわかります。キューがすでに存在する場合は、直接返します。 (2) 不在の場合に() パブリック同期V putIfAbsent ( K k 、 V v ) { if ( ! containsKey ( k )) put ( k , v ) を返します。 それ以外 get ( k ) を返します。 } パブリックブール値のcontainsKey ( オブジェクトk ){ マップを返します。 キーを含む( k ); } このメソッドは主にキューを取得または設定するために使用されます。複数のスレッドによって同時に実行されます。 「synchronized」で変更すると、スレッドの安全性を確保できます。キューが存在しない限り、キューは設定されません。 (3)置く() パブリック同期V put ( K k 、 V v ) { Map < K , V > コピー= new HashMap < K , V > ( this . map ); V 前= コピー。 ( k , v ) を置く。 これ。 map = コレクション。 変更不可能なマップ ( コピー); 前を返す; } このメソッドは主にキューを設定するために使用されます。配置時に、同時に 1 つのスレッドだけがこの値を更新するように、「同期」によっても変更されます。 では、なぜ書き込み操作は読み取り操作をブロックしないのでしょうか? - まず、HashMap コレクションのコピーを再作成します。
- 「 volatile 」書き込みを通じて対応するセットに値を割り当てます。
- 新しいコレクションを「変更不可能なマップ」に設定し、フィールド マップに割り当てます。
これにより、読み取りと書き込みの分離が実現されます。 Producer の最も重要な部分は、複数のスレッドによって同時にアクセスされるキャッシュ プールです。したがって、この領域での高並行性設計は非常に重要です。 IV.結論ここで、この記事の要点をまとめてみましょう。 1. Kafka クライアントからバッチ メッセージを送信する利点を見てみましょう。 2. この記事では、実際のシナリオのアナロジーを使用して RecordAccumulator の内部構造を理解し、クライアントでメッセージがキャッシュされる方法と各内部コンポーネントの実装原則を詳細に分析します。 3. Kafka クライアントで非常に重要な BufferPool と CopyOnWriteMap の実装原則を詳しく分析します。 |