Kafka ソースコード実装メカニズムのクライアントキャッシュアーキテクチャ設計の図解説明

Kafka ソースコード実装メカニズムのクライアントキャッシュアーキテクチャ設計の図解説明

みなさんこんにちは。Hua Zaiです。またお会いできて嬉しいです。

前回の記事では主に「Kafka ネットワーク層の送受信全体のプロセス」を深く分析しました。今日は主に「Kafka クライアント メッセージ キャッシュ アーキテクチャ設計」について説明し、メッセージがどのようにキャッシュされるかを詳しく分析します。

この記事を注意深く読むと、Kafka クライアント キャッシュ アーキテクチャのソース コードに対する理解が深まると思います。

この記事には役立つ情報が沢山含まれているので、じっくり読んでいただければ幸いです。

I. 全体概要

シナリオ駆動型アプローチでは、送信されたメッセージがネットワーク要求を通じてカプセル化されると、NIO マルチプレクサはネットワークの読み取りおよび書き込みイベントをリッスンし、ネットワーク メッセージ送受信を行います。クライアント上でメッセージがどのようにキャッシュされるかを見てみましょう。

Kafka が超高スループットのメッセージング システムであることは誰もが知っていますが、これは主に「非同期送信」、「バッチ送信」、「メッセージ圧縮」に反映されています。

この記事に関連するのは「バッチ送信」です。つまり、プロデューサーがメッセージをキャッシュし、特定の条件が満たされると、送信者サブスレッドがメッセージをバッチで Kafka ブローカーに送信します。

これによる利点は、「ネットワーク要求の数を最小限に抑え、ネットワーク スループットを向上させること」です。

誰もが理解しやすいように、すべてのソースコードの骨組みのみが保持されます。

2. メッセージはクライアント側でどのようにキャッシュされますか?

バッチで送信されるため、メッセージをキャッシュする必要があります。では、メッセージはどこにキャッシュされるのでしょうか?どのように管理されていますか?

以下の簡略化されたフローチャートからわかるように、送信されるメッセージは主に RecordAccumulator にキャッシュされます。

実際のシナリオに例えて説明すると、理解しやすくなります。

RecordAccumulator はメッセージを蓄積する倉庫のようなものなので、宅配便の倉庫の例えを使ってみましょう。

上の写真は商品が満載の急行倉庫です。仕分け担当者が、さまざまな目的地の荷物を目的地に対応する貨物箱に入れ、いっぱいになった箱を対応するエリアに置いている様子を見ることができます。

次に、ソーターは RecordAccumulator を参照し、ボックスとそれぞれのスタッキング領域は、RecordAccumulator 内でメッセージがキャッシュされる場所です。封印された箱はすべて、送り主が受け取って発送するまで待機します。

上の図を理解すれば、RecordAccumulator のアーキテクチャ設計と動作ロジックを大まかに理解できるようになります。

倉庫にあるものを要約すると次のようになります。

  • ソーター
  • 行き先
  • カーゴボックス
  • スタッキングエリア

これらの概念を覚えておいてください。これらはソース コードに反映されます。プロセスは次の図に示されています。

上の図からわかるように、

  • 少なくとも 1 つのビジネス メイン スレッドと 1 つの送信側スレッドが RecordAccumulator を同時に操作するため、スレッドセーフである必要があります。
  • その中には、ConcurrentMap コレクション「 Kafka カスタマイズされた CopyOnWriteMap 」があります。キー: TopicPartiton、値: Deque<ProducerBatch>、つまり、トピック パーティションを単位として、メッセージは ProducerBatch に蓄積されキャッシュされ、複数の ProducerBatch が Deque キューに格納されます。 Deque 内の最新のバッチがメッセージを収容できない場合、キャッシュを続行するために新しいバッチが作成され、Deque に追加されます。
  • Kafka は、メモリ破壊の頻繁な適用によって発生する Full GC 問題を軽減するために、ProducerBatch を介してデータをキャッシュする古典的な「 BufferPool メカニズム」を設計しています。

要約すると、RecordAccumulator クラスには、「ProducerBatch」、「Custom CopyOnWriteMap」、および「BufferPool メカニズム」という 3 つの重要なコンポーネントがあることがわかります。

スペースの制約により、RecordAccumulator クラスについては次の記事で説明します。

まず、メッセージのキャッシュと送信の最小単位である ProducerBatch について見てみましょう。

github ソースコードのアドレスは次のとおりです。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Producer/internals/ProducerBatch.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/ByteBufferOutputStream.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Producer/internals/ProduceRequestResult.java。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Producer/internals/FutureRecordMetadata.java。

呼び出し関係から、ProducerBatch は MemoryRecordsBuilder に依存し、MemoryRecordsBuilder は MemoryRecords に依存していることがわかります。つまり、「MemoryRecords はメッセージが実際に保存される場所です」。

1. メモリレコード

import java.nio.ByteBuffer; public class MemoryRecords extends AbstractRecords { public static MemoryRecordsBuilder builder(..){ // 重载builder return builder(...); } public static MemoryRecordsBuilder builder( ByteBuffer buffer, // 消息版本byte magic, // 消息压缩类型CompressionType compressionType, // 时间戳TimestampType timestampType, // 基本位移long baseOffset, // 日志追加时间long logAppendTime, // 生产者id long producerId, // 生产者版本short producerEpoch, // 批次序列号int baseSequence, boolean isTransactional, // 是否是控制类的批次boolean isControlBatch, // 分区leader的版本int partitionLeaderEpoch) { // 初始化MemoryRecordsBuilder类return new MemoryRecordsBuilder(...); } }

このクラスは比較的簡単です。ビルダー メソッドから、メッセージの保存に ByteBuffer に依存していることがわかります。 MemoryRecordsBuilder クラスの構築は、MemoryRecords.builder() を通じて初期化されます。

MemoryRecordsBuilder クラスの実装を見てみましょう。

2. メモリレコードビルダー

public class MemoryRecordsBuilder implements AutoCloseable { // 写操作关闭的输出流private static final DataOutputStream CLOSED_STREAM = new DataOutputStream(new OutputStream() { // 当向某个ByteBuffer关闭输出流写数据时抛异常public void write(int b) { throw new ...; } }); // 日志时间private final TimestampType timestampType; // 消息压缩类型private final CompressionType compressionType; // kafka对OutputStream接口的实现类,对ByteBuffer实现了自动扩容功能private final ByteBufferOutputStream bufferStream; // 消息的版本private final byte magic; // ByteBuffer的最初始位置private final int initialPosition; // 基本位移private final long baseOffset; // 消息追加的时间private final long logAppendTime; // 是否是控制类的批次private final boolean isControlBatch; // 分区leader的版本private final int partitionLeaderEpoch; // 写入上限private final int writeLimit; // batch头大小字节数private final int batchHeaderSizeInBytes; // 评估压缩率private float estimatedCompressionRatio = 1.0F; // 对bufferStream添加压缩功能private DataOutputStream appendStream; // 是否是事务批次private boolean isTransactional; // 生产者id private long producerId; // 生产者版本private short producerEpoch; // 批次序列号private int baseSequence; // 压缩前要写入的消息体大小字节数private int uncompressedRecordsSizeInBytes = 0; // 压缩前写入的记录数(不包括头) private int numRecords = 0; // 实际压缩率private float actualCompressionRatio = 1; // 最大时间戳private long maxTimestamp = RecordBatch.NO_TIMESTAMP; // 最大时间戳偏移量private long offsetOfMaxTimestamp = -1; // 最后的偏移量private Long lastOffset = null; // 第一次追加消息的时间戳private Long firstTimestamp = null; // 真正保存消息的地方private MemoryRecords builtRecords;

このカテゴリには多くのフィールドがありますが、ここではバイト ストリームに関する 2 つのフィールドについてのみ説明します。

  • CLOSED_STREAM : ByteBuffer が閉じられると、対応する書き込み出力ストリームも CLOSED_STREAM に設定されます。目的は、ByteBuffer にデータが書き込まれるのを防ぐことです。それ以外の場合は例外がスローされます。
  • bufferStream : まず、MemoryRecordsBuilder はメッセージの保存を完了するために ByteBuffer に依存します。 ByteBuffer を ByteBufferOutputStream にカプセル化し、Java NIO の OutputStream を実装して、データをストリーム形式で書き込むことができるようにします。同時に、 ByteBufferOutputStream はByteBuffer を自動的に拡張する機能を提供します。

初期化構築方法を見てみましょう。

 public MemoryRecordsBuilder(ByteBuffer buffer,...) { // 将MemoryRecordsBuilder关联的ByteBuffer封装成ByteBufferOutputStream流this(new ByteBufferOutputStream(buffer), ...); } // 构造方法public MemoryRecordsBuilder( ByteBufferOutputStream bufferStream, ... int writeLimit) { .... // 初始位置this.initialPosition = bufferStream.position(); // 1. 根据不同消息版本计算批次Batch头的长度this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType); // 2. 调整对应的position bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; // 3. 在bufferStream流外层套一层压缩流,再套一层DataOutputStream流this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); } }

コンストラクターからわかるように、基本フィールドの割り当てに加えて、次の 3 つのことが行われます。

  • バッチ ヘッダーのサイズと長さは、メッセージのバージョンと圧縮タイプに基づいて計算されます。
  • bufferStream の位置を調整して Batch ヘッダーの位置をスキップすることで、メッセージを直接書き込むことができます。
  • bufferStream に圧縮機能を追加します。

これを見ると非常に興味深いです。読者は、ここで「ByteBuffer」、「bufferStream」、および「appendStream」が関係していることに気付いているでしょうか。

3 つの関係は、「デコレータ パターン」を通じて実現されます。つまり、bufferStream は ByteBuffer を装飾して拡張機能を実現し、appendStream は bufferStream を装飾して圧縮機能を実現します。

そのコアメソッドを見てみましょう。

(1) オフセット付き追加()

 // 追加新记录public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers); } // 计算下一个连续偏移量private long nextSequentialOffset() { // lastOffset用来记录当前写入Record的offset,每次当有新Record写入时,都会递增它。 return lastOffset == null ? baseOffset : lastOffset + 1; } // 根据偏移量追加消息private Long appendWithOffset( long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { try { // 检查isControl标志是否一致if (isControlRecord != isControlBatch) throw new ...; // 保证offset是递增的if (lastOffset != null && offset <= lastOffset) throw new ...; // 检查时间戳if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) throw new ...; // 只有V2版本才有header if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new ...; // 更新firstTimestamp if (firstTimestamp == null) firstTimestamp = timestamp; // V2版本消息写入if (magic > RecordBatch.MAGIC_VALUE_V1) { appendDefaultRecord(offset, timestamp, key, value, headers); return null; } else { //V0、V1 版本消息写入(此处不进行剖析) return appendLegacyRecord(offset, timestamp, key, value, magic); } } catch (IOException e) { // 抛异常} }

このメソッドは主にオフセットに従ってメッセージを追加するために使用され、メッセージのバージョンに従って対応するメッセージを書き込みます。ただし、ProducerBatch はバージョン V2 に対応していることは明らかです。

V2 バージョンのメッセージ書き込みロジックを見てみましょう。

 private void appendDefaultRecord( long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { // 1. 检查appendStream状态是否可以写ensureOpenForRecordAppend(); // 2. 计算写入多少偏移量int offsetDelta = (int) (offset - baseOffset); // 3.计算本次写与第一次写之间时间差long timestampDelta = timestamp - firstTimestamp; // 4.使用DefaultRecord.writeTo()方法会按照V2 版本格式写入appendStream流中,并返回压缩前的消息大小int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); // 5. 消息写入成功后更新RecordBatch的元信息recordWritten(offset, timestamp, sizeInBytes); } // 判断appendStream状态是否为CLOSED_STREAM private void ensureOpenForRecordAppend() { if (appendStream == CLOSED_STREAM) throw new ...; } // 消息写入成功后更新RecordBatch的元信息private void recordWritten(long offset, long timestamp, int size) { .... // 压缩前写入的记录数+ 1 numRecords += 1; // 压缩前要写入的消息体大小字节数+ size uncompressedRecordsSizeInBytes += size; // 最后的偏移量+ offset lastOffset = offset; if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) { // 赋值最大时间戳maxTimestamp = timestamp; // 赋值最大时间戳偏移量offsetOfMaxTimestamp = offset; } }

このメソッドは主に V2 バージョン メッセージを書き込むために使用され、主に次の 5 つのことを行います。

  • 書き込み可能かどうかを確認します。appendStream の状態が CLOSED_STREAM であるかどうかを判断します。そうでない場合は書き込み可能であり、そうでない場合は例外がスローされます。
  • 今回どれだけのオフセットを書き込むか計算します。
  • この書き込みと最初の書き込みの間の時間差を計算します。
  • メッセージを V2 形式で appendStream ストリームに書き込み、圧縮前のメッセージのサイズを返します。
  • 成功した場合、RecordBatch のメタデータが更新されます。

(2) 部屋がある()

 public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { // 检查两个状态// (1)appendStream流状态// (2)当前已经写入的预估字节数是否超过了writeLimit写入上限if (isFull()) return false; // 每个RecordBatch至少可以写入一个Record,此时如果一个Record都没有,则可以继续写入if (numRecords == 0) return true; final int recordSize; if (magic < RecordBatch.MAGIC_VALUE_V2) { // 预估V0、V1旧版本的Record大小recordSize = Records.LOG_OVERHEAD + LegacyRecord.recordSize(magic, key, value); } else { // 预估V2版本写入的Record大小int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1); ... recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, headers); } // 已写入字节数+ 本次写入Record的预估字节数不能超过writeLimit写入上限return this.writeLimit >= estimatedBytesWritten() + recordSize; } public boolean isFull() { return appendStream == CLOSED_STREAM || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); }

このメソッドは主に、現在の MemoryRecordsBuilder に書き込むレコードを収容するスペースがあるかどうかを推定するために使用され、以下の ProducerBatch.tryAppend() で呼び出されます。

最後に、冒頭で触れた自動拡張機能について見てみましょう。

(3) バッファ拡張()

 public class ByteBufferOutputStream extends OutputStream { // 扩容因子1.1倍private static final float REALLOCATION_FACTOR = 1.1f; // 初始容量private final int initialCapacity; // 初始位置private final int initialPosition; // 计算是否需要扩容public void ensureRemaining(int remainingBytesRequired) { // 当写入字节数大于buffer当前剩余字节数就开启扩容if (remainingBytesRequired > buffer.remaining()) expandBuffer(remainingBytesRequired); } // 扩容private void expandBuffer(int remainingRequired) { // 1. 评估需要多少空间int expandSize = Math.max((int) (buffer.limit() * REALLOCATION_FACTOR), buffer.position() + remainingRequired); // 2. 申请新的ByteBuffer ByteBuffer temp = ByteBuffer.allocate(expandSize); // 3. 获取写入上限int limit = limit(); // 4. 写状态转换为读状态buffer.flip(); // 5. 将buffer读到新申请的temp里temp.put(buffer); // 6. 修改写模式的limit上限buffer.limit(limit); // 7. 更新原来的buffer的position,防止被重复消费buffer.position(initialPosition); // 8. 将引用指向新申请的ByteBuffer buffer = temp; } }

このメソッドは主に、ByteBuffer を拡張する必要があるかどうかを判断するために使用されます。つまり、書き込まれたバイト数がバッファ内に残っている現在のバイト数よりも大きい場合、拡張が開始されます。拡張には次の 3 つが必要です。

  • 必要なスペースの量を評価する: 「拡張スペース」と「実際に必要なバイト数」の間の最大値を取得します。ここで「拡張係数」が計算に使用されるのは、主に拡張にはシステム リソースの消費が必要になるためです。毎回実際のデータ サイズに応じてスペースを割り当てると、不要なシステム リソースが浪費されます。
  • 新しいスペースの適用: 拡張量に基づいて新しい ByteBuffer を申請し、ソース コードの手順「3 - 7」に対応して、元の ByteBuffer データをそこにコピーします。
  • 最後に、参照は新しく適用された ByteBuffer を指します。

次に、ProducerBatch の実装を見てみましょう。

3. プロデューサーバッチ

public final class ProducerBatch { // 批次最终状态private enum FinalState { ABORTED, FAILED, SUCCEEDED } // 批次创建时间final long createdMs; // 批次对应的主题分区final TopicPartition topicPartition; // 请求结果的future final ProduceRequestResult produceFuture; // 用来存储消息的callback和响应数据private final List<Thunk> thunks = new ArrayList<>(); // 封装MemoryRecords对象,用来存储消息的ByteBuffer private final MemoryRecordsBuilder recordsBuilder; // batch的失败重试次数private final AtomicInteger attempts = new AtomicInteger(0); // 是否是被分裂的批次private final boolean isSplitBatch; // ProducerBatch的最终状态private final AtomicReference<FinalState> finalState = new AtomicReference<>(null); // Record个数int recordCount; // 最大Record字节数int maxRecordSize; // 最后一次失败重试发送的时间戳private long lastAttemptMs; // 最后一次向该ProducerBatch追加Record的时间戳private long lastAppendTime; // Sender子线程拉取批次的时间private long drainedMs; // 是否正在重试过,如果ProducerBatch中的数据发送失败,则会重新尝试发送private boolean retry; } // 构造函数public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long createdMs, boolean isSplitBatch) { ... // 请求结果的future this.produceFuture = new ProduceRequestResult(topicPartition); ... }

ProducerBatch は、通常「バッチ メッセージ」と呼ばれる 1 つ以上のメッセージを格納します。

いくつかの重要なフィールドを見てみましょう。

  • topicPartition : バッチに対応するトピック パーティション。現在の ProducerBatch にキャッシュされているすべてのレコードがこの TopicPartition に送信されます。
  • produceFuture : ProduceRequestResult クラスによって実装される、リクエスト結果の Future。
  • thunks : 各レコードに関連付けられたメッセージ コールバックと機能応答データを格納するために使用される Thunk オブジェクトのコレクション。
  • recordsBuilder : MemoryRecords オブジェクトをカプセル化し、メッセージの ByteBuffer を保存するために使用されます。
  • 試行回数: バッチの失敗した再試行回数。 AtomicInteger は、Integer を使用するためのアトミック操作を提供し、高並行性が必要な状況での使用に適しています
  • isSplitBatch : 分割バッチであるかどうか。これは、単一のメッセージが 1 つの ProducerBatch に格納するには大きすぎるため、複数の ProducerBatch に分割されて格納される場合です。
  • dawnedMs : Sender 子スレッドがバッチをプルするのにかかる時間。
  • retry : ProducerBatch 内のデータの送信に失敗した場合、再度送信を試みます。

コンストラクタでは、重要な依存コンポーネントとして「メッセージ生成結果を非同期に取得するためのクラス」である「ProduceRequestResult」があります。簡単に分析してみましょう。

(1) ProduceRequestResultクラス

public class ProduceRequestResult { // 通过一个count为1的CountDownLatch对象间接地实现了Future的功能。 private final CountDownLatch latch = new CountDownLatch(1); private final TopicPartition topicPartition; // 用来记录broker端关联ProducerBatch中第一条Record分配的offset值// 这样每个Record的真实offset就可以根据自身在ProducerBatch的位置计算出来了(baseOffset + relativeOffset) private volatile Long baseOffset = null; // 构造函数public ProduceRequestResult(TopicPartition topicPartition) { this.topicPartition = topicPartition; } // 当等到响应会会调该函数唤醒阻塞的主线程public void done() { if (baseOffset == null) throw new ...; this.latch.countDown(); } // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行public void await() throws InterruptedException { latch.await(); } // 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); } }

このクラスは、CountDownLatch(1) を通じて Future 関数を間接的に実装し、他のすべてのスレッドがこのロックを待機するようにします。現時点では、他のすべての待機中のスレッドの実行を同時に再開するには、countDown() メソッドを 1 回呼び出すだけで済みます。

Producer がメッセージを送信すると、間接的に "ProduceRequestResult.await" が呼び出され、スレッドはサーバーからの応答を待機します。サーバーが応答すると、「ProduceRequestResult.done」が呼び出され、「CountDownLatch.countDown」が呼び出され、「CountDownLatch.await」でブロックされているメインスレッドが起動します。これらのスレッドは、ProduceRequestResult のエラー フィールドを使用して、要求が成功したか失敗したかを判断できます。

次に、ProducerBatch クラスの重要なメソッドを見てみましょう。

(2) トライアペンド()

 public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { // 1.检查MemoryRecordsBuilder是否还有空间写入if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { return null; } else { // 2.调用append()方法写入Record Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); // 3. 更新最大Record字节数this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),recordsBuilder.compressionType(), key, value, headers)); ... // 4.构建FutureRecordMetadata对象FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,timestamp, checksum,key == null ? -1 : key.length,value == null ? -1 : value.length, Time.SYSTEM); // 5. 将Callback和FutureRecordMetadata记录到thunks集合中thunks.add(new Thunk(callback, future)); // 6. 更新Record记录数this.recordCount++; // 7. 返回FutureRecordMetadata return future; } }

このメソッドは主にメッセージを追加するために使用され、主に次の 6 つのことを行います。

  • MemoryRecordsBuilder の hasRoomFor() を使用して、現在の ProducerBatch に今回書き込まれたレコードを格納するのに十分なスペースがあるかどうかを確認します。
  • MemoryRecordsBuilder.append() メソッドを呼び出して、レコードを ByteBuffer に追加します。
  • このレコードの送信に対応する、最下層の Future インターフェースを継承するFutureRecordMetadata オブジェクトを作成します
  • Future とメッセージ コールバックを Thunk オブジェクトにカプセル化し、それを thunks コレクションに配置します。
  • レコード番号を更新します。
  • FutureRecordMetadata を返します。

このメソッドでは、プロデューサーのメイン スレッドがメッセージ キャッシュを完了することしかできず、実際のネットワーク送信は実装されないことがわかります。

次に、JDK で並行 Future インターフェースを実装する FutureRecordMetadata を簡単に見てみましょう。 ProduceRequestResult オブジェクトを維持することに加えて、relativeOffset などのフィールドも維持します。relativeOffset は、ProducerBatch 内の対応する Record のオフセットを記録するために使用されます。

このクラスには、get() と value() という 2 つの注目すべきメソッドがあります。

 public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { ... // 依赖ProduceRequestResult的CountDown来实现阻塞等待boolean occurred = this.result.await(timeout, unit); ... // 调用value()方法返回RecordMetadata对象return valueOrError(); } RecordMetadata valueOrError() throws ExecutionException { ... return value(); }

このメソッドは主に ProduceRequestResult の CountDown に依存してブロッキング待機を実装し、最後に value() を呼び出して RecordMetadata オブジェクトを返します。

 RecordMetadata value() { ... // 将partition、baseOffset、relativeOffset、时间戳(LogAppendTime | CreateTimeStamp)等信息封装成RecordMetadata 对象返回return new RecordMetadata( result.topicPartition(), ...); } private long timestamp() { return result.hasLogAppendTime() ? result.logAppendTime() : createTimestamp; }

このメソッドは主に、さまざまなパラメータを RecordMetadata オブジェクトにカプセル化して返します。

ProducerBatch がデータを書き込む方法がわかったので、done() メソッドを見てみましょう。プロデューサーがブローカーから「正常」などの応答を受信した場合 | 「タイムアウト」| 「異常」 | 「プロデューサーを閉じる」と、ProducerBatch の done() メソッドが呼び出されます。

(3) 完了()

 public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) { // 1.根据exception决定本次ProducerBatch发送的最终状态final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED; .... // 2.通过CAS操作更新finalState状态,只有第一次更新的时候,才会触发completeFutureAndFireCallbacks()方法if (this.finalState.compareAndSet(null, tryFinalState)) { // 3.执行回调completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception); return true; } .... return false; }

このメソッドは主に、コールバック操作を実行できるかどうかを判断するために使用されます。つまり、バッチ応答を受信した後、バッチの最終ステータスに基づいてコールバック操作を実行できるかどうかを判断します。

(4) 完全なFutureAndFireCallbacks()

 private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) { // 1.更新ProduceRequestResult中的相关字段produceFuture.set(baseOffset, logAppendTime, exception); // 2.遍历thunks集合,触发每个Record的Callback回调for (Thunk thunk : thunks) { try { if (exception == null) { // 3.获取消息元数据RecordMetadata metadata = thunk.future.value(); if (thunk.callback != null) //4.调用回调方法thunk.callback.onCompletion(metadata, null); } else { if (thunk.callback != null) // 4.调用回调方法thunk.callback.onCompletion(null, exception); } } .... } // 4.调用底层CountDownLatch.countDown()方法,阻塞在其上的主线程。 produceFuture.done(); }

このメソッドは主にコールバック メソッドを呼び出して future を完了するために使用され、主に次の 3 つのことを行います。

  • 基本的な変位、メッセージ追加時間、例外など、ProduceRequestResult の関連フィールドを更新します。
  • thunks コレクションを走査し、各レコードの Callback コールバックをトリガーします。
  • 基になる CountDownLatch.countDown() メソッドを呼び出し、メイン スレッドをブロックします。

これまで、ProducerBatch の最も重要な 3 つの方法、「メッセージをキャッシュする方法」、「応答を処理する方法」、「コールバックを処理する方法」について説明しました。

キャッシュされたメッセージの保存構造を図で説明してみましょう。

次に、Kafka のプロダクション側で最も古典的な「バッファ プール アーキテクチャ」を見てみましょう。

3. クライアント キャッシュ プール アーキテクチャ設計

クライアントにキャッシュ プールの従来のアーキテクチャ設計が必要なのはなぜですか?

主な理由は、ProducerBatch を頻繁に作成してリリースすると Full GC の問題が発生するため、Kafka ではこの問題に対する非常に優れたメカニズムである「バッファ プール BufferPool メカニズム」を実装していることです。つまり、各バッチの最下層は、メッセージを格納するために特別に使用され、使用後に返すことができるメモリ空間の一部に対応します。

次に、キャッシュプールのソースコード設計を見てみましょう。

1. バッファプール

github ソースコードのアドレスは次のとおりです。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/clients/Producer/internals/BufferPool.java。

 public class BufferPool { // 整个BufferPool总内存大小默认32M private final long totalMemory; // 当前BufferPool管理的单个ByteBuffer大小,16k private final int poolableSize; // 因为有多线程并发分配和回收ByteBuffer,用锁控制并发,保证线程安全。 private final ReentrantLock lock; // 对应一个ArrayDeque<ByteBuffer> 队列,其中缓存了固定大小的ByteBuffer 对象private final Deque<ByteBuffer> free; // 此队列记录因申请不到足够空间而阻塞的线程对应的Condition 对象private final Deque<Condition> waiters; // 非池化可用的内存即totalMemory减去free列表中的全部ByteBuffer的大小private long nonPooledAvailableMemory; // 构造函数public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) { ... // 总的内存this.totalMemory = memory; // 默认的池外内存,就是总的内存this.nonPooledAvailableMemory = memory; } }

上記の重要なフィールドを見てみましょう。

  • totalMemory : BufferPool 全体のメモリ サイズ「buffer.memory」。デフォルトは 32M です。
  • poolableSize : プールされたキャッシュ プール内のメモリ ブロックのサイズ (batch.size)。デフォルトは 16k です。
  • lock : 複数のスレッドが同時に ByteBuffer を割り当ててリサイクルする場合、スレッドの安全性を確保するためにロックを使用して同時実行を制御します。
  • free : 指定されたサイズの ByteBuffer オブジェクトをキャッシュするプールされた空きキュー。
  • waiters : ブロックされたスレッドに対応する条件キュー。スレッドが十分なメモリを適用できない場合、そのスレッドはブロックされ、他のスレッドがメモリを解放するまで待機します。対応する Condition オブジェクトがキューに入ります。
  • Non-Pooledavailablemememory :非プール利用可能なメモリ。

固定サイズの「プール可能な16K」のbytebufferのみを管理することがわかります。 ArrayDequeの初期化サイズは16です。この時点で、BufferPoolのステータスは次のとおりです。

次に、BufferPoolの重要な方法を見てみましょう。

(1)Allocate()

 // 分配指定空间的缓存,如果缓冲区中没有足够的空闲空间,那么会阻塞线程,直到超时或得到足够空间public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { // 1.判断申请的内存是否大于总内存if (size > this.totalMemory) throw new IllegalArgumentException("Attempt to allocate " + size + " bytes, but there is a hard limit of "+ this.totalMemory + " on memory allocations."); // 初始化buffer ByteBuffer buffer = null; // 2.加锁,保证线程安全。 this.lock.lock(); // 如果当前BufferPool处于关闭状态,则直接抛出异常if (this.closed) { this.lock.unlock(); throw new KafkaException("Producer closed while allocating memory"); } .... try { // 3.申请内存大小恰好为16k 且free缓存池不为空if (size == poolableSize && !this.free.isEmpty()) // 从free队列取出一个ByteBuffer return this.free.pollFirst(); // 对于申请内存大小非16k情况// 先计算free缓存池总空间大小,判断是否足够int freeListSize = freeSize() * this.poolableSize; // 4.当前BufferPool能够释放出申请内存大小的空间if (this.nonPooledAvailableMemory + freeListSize >= size) { // 5.如果size大于非池化可用内存大小,就循环从free缓存池里释放出来空闲Bytebuffer补充到nonPooledAvailableMemory中,直到满足size大小为止。 freeUp(size); // 释放非池化可用内存大小this.nonPooledAvailableMemory -= size; } else { // 如果当前BufferPool不够提供申请内存大小,则需要阻塞当前线程// 累计已经释放的内存int accumulated = 0; // 创建对应的Condition,阻塞自己等待别的线程释放内存Condition moreMemory = this.lock.newCondition(); try { // 计算当前线程最大阻塞时长long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); // 把自己添加到等待队列中末尾,保持公平性,先来的先获取内存,防止饥饿this.waiters.addLast(moreMemory); // 循环等待直到分配成功或超时while (accumulated < size) { .... try { // 当前线程阻塞等待,返回结果为false则表示阻塞超时waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } finally { .... } .... // 申请内存大小是16k,且free缓存池有了空闲的ByteBuffer if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // 从free队列取出一个ByteBuffer buffer = this.free.pollFirst(); // 计算累加器accumulated = size; } else { // 释放空间给非池化可用内存,并继续等待空闲空间,如果分配多了只取够size的空间freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory); // 释放非池化可用内存大小this.nonPooledAvailableMemory -= got; // 累计分配了多少空间accumulated += got; } } accumulated = 0; } finally { // 如果循环有异常,将已释放的空间归还给非池化可用内存this.nonPooledAvailableMemory += accumulated; //把自己从等待队列中移除并结束this.waiters.remove(moreMemory); } } } finally { // 当非池化可用内存有内存或free缓存池有空闲ByteBufer且等待队列里有线程正在等待try { if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty()) // 唤醒队列里正在等待的线程this.waiters.peekFirst().signal(); } finally { // 解锁lock.unlock(); } } // 说明空间足够,并且有足够空闲的了。可以执行真正的分配空间了。 if (buffer == null) // 没有正好的buffer,从缓冲区外(JVM Heap)中直接分配内存return safeAllocateByteBuffer(size); else // 直接复用free缓存池的ByteBuffer return buffer; } private ByteBuffer safeAllocateByteBuffer(int size) { boolean error = true; try { //分配空间ByteBuffer buffer = allocateByteBuffer(size); error = false; //返回buffer return buffer; } finally { if (error) { //分配失败了, 加锁,操作内存pool this.lock.lock(); try { //归还空间给非池化可用内存this.nonPooledAvailableMemory += size; if (!this.waiters.isEmpty()) //有其他在等待的线程的话,唤醒其他线程this.waiters.peekFirst().signal(); } finally { // 加锁不忘解锁this.lock.unlock(); } } } } protected ByteBuffer allocateByteBuffer(int size) { // 从JVM Heap中分配空间return ByteBuffer.allocate(size); } // 不断从free队列中释放空闲的ByteBuffer来补充非池化可用内存private void freeUp(int size) { while (!this.free.isEmpty() && this.nonPooledAvailableMemory < size) this.nonPooledAvailableMemory += this.free.pollLast().capacity(); }

この方法は、主にByteBufferの割り当てを試みるために使用されます。説明する4つの状況が次のとおりです。

ケース1:16Kを申請し、無料のキャッシュプールに使用可能なメモリがあります

この時点で、チームリーダーのBytebufferは、割り当てと使用のために無料のキャッシュプールから直接取得されます。使用後、Bytebufferは無料のキャッシュプールの最後に直接配置され、Clear()が呼び出され、次回の再利用のためにデータをクリアします。

ケース2:16Kが適用され、無料のキャッシュプールに利用できるメモリはありません

現時点では、無料のキャッシュプールに使用可能なメモリがないため、プールされていないメモリから16kのメモリしか取得して、それを割り当てることができます。使用後、ByteBufferをフリーキャッシュプールのテールに直接配置し、Clear()を呼び出して、次回の再利用のためにデータをクリアします。

ケース3:非16Kアプリケーションと無料キャッシュプールに使用できるメモリなし

現時点では、無料のキャッシュプールに利用可能なメモリはありません。非16Kアプリケーションが適用されます。メモリの一部のみが、それを割り当てるために、プールされていない使用可能なメモリ(割り当てるのに十分なスペース)から取得できます。使用後、要求されたメモリスペースは、将来的にはGCによってドロップされる非プールされたメモリに直接放出されます。

ケース4:使用可能なメモリを備えた非プールされていないメモリを備えた非16Kおよびフリーキャッシュプールを申請します

現時点では、無料のキャッシュプールには利用可能なメモリがありますが、アプリケーションは非16Kです。まず、要求されたメモリサイズが満たされるまで、無料のキャッシュプールからBytebufferを非プーリックメモリに放出し、次に非プーリング不可能なメモリから対応するメモリサイズを取得して、割り当てます。使用後、要求されたメモリスペースは、将来GCによってドロップされる非調和のメモリに直接放出されます。

(2)deallocate()

 public void deallocate(ByteBuffer buffer, int size) { // 1.加锁,保证线程安全。 lock.lock(); try { // 2.如果待释放的size大小为16k,则直接放入free队列中if (size == this.poolableSize && size == buffer.capacity()) { // 清空buffer buffer.clear(); // 释放buffer到free队列里this.free.add(buffer); } else { //如果非16k,则由JVM GC来回收ByteBuffer并增加非池化可用内存this.nonPooledAvailableMemory += size; } // 3.唤醒waiters中的第一个阻塞线程Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { // 释放锁lock.unlock(); } }

この方法は、主にByteBufferスペースを解放しようとするために使用され、主に次のことを行います。

  • まずロックを追加して、スレッドの安全性を確保します。
  • リリースされるサイズが16Kの場合、無料キューに直接配置されます。
  • それ以外の場合、bytebufferはJVM GCによってリサイクルされ、非プールドベイレーブルメモリーが追加されます。
  • ByteBufferがリサイクルされたら、ウェイターの最初のブロッキングスレッドを起動します。

最後に、「読み取りおよび書き込み分離シナリオ」CopyOnWriteMapのKafkaのカスタムサポートの実装を見てみましょう。

2。CopyOnWriteMap

GitHubのソースコードアドレスは次のとおりです。

https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/copyonwritemap.java。

RecordAccumulatorクラスのプロパティフィールドから、CopyOnWriteMapのキーがトピックパーティションであり、値はこのパーティションに送信されたDeque <ProcesserBatch>キューコレクションであることがわかります。

メッセージを作成するとき、送信されるパーティションがめったに変更されないことがわかっているため、書き込み操作はほとんどありません。ほとんどの場合、最初にパーティションの対応するキューを取得し、次にプロデューサーバッチをキューの最後に入れるので、読み取り操作が非常に頻繁に行われます。これは、典型的な「もっと読む」シナリオです。

いわゆる「CopyOnWrite」は、執筆時にコピーをコピーして書き込み操作を実行し、執筆後に元のコレクションを交換することを意味します。

ソースコードの実装を見てみましょう。

 public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> { // volatile Map private volatile Map<K, V> map; // 构造函数public CopyOnWriteMap() { this.map = Collections.emptyMap(); }

このクラスには、「揮発性」によって変更される重要なフィールドマップは1つしかありません。目的は、マップが変更されたときにマップが変更されると表示されることです。

いくつかの重要な方法を見てみましょう。これらはすべて比較的単純ですが、実装は非常に古典的です。

03.2.1 get()

 // 获取集合中队列public V get(Object k) { return map.get(k); }

この方法は、主にセットのキューを読み取るために使用されます。読み取り操作がロックされておらず、マルチスレッドの同時読み取りのシーンはブロックされず、高い並行読み取りを実現できます。キューが既に存在する場合は、直接返品してください。

(2)putifabsent()

 public synchronized V putIfAbsent(K k, V v) { if (!containsKey(k)) return put(k, v); else return get(k); } // 判断队列是否存在public boolean containsKey(Object k) { return map.containsKey(k); }

この方法は、主にキューの取得または設定に使用され、複数のスレッドによって同時に実行されます。 「同期」による変更は、スレッドの安全性を確保することができ、キューが存在しない限り設定されます。

(3)put()

 public synchronized V put(K k, V v) { Map<K, V> copy = new HashMap<K, V>(this.map); V prev = copy.put(k, v); this.map = Collections.unmodifiableMap(copy); return prev; }

この方法は主にキューの設定に使用され、PUTの場合は「同期」によって変更されます。これにより、1つのスレッドのみがこの値を同時に更新できるようにします。

それでは、なぜ書き込み操作が読み取り操作をブロックしないと言われているのですか?

  • 最初にハッシュマップコレクションのコピーを再作成します。
  • 「揮発性」書き込み方法を介して対応するセットに値を割り当てます。
  • 新しいコレクションを「変更されていないマップ」に設定し、フィールドマップに割り当てます。

これにより、読み取りと書き込みの分離が実現します。プロデューサーの最もコアの場合、キャッシュプールはマルチスレッドへの同時アクセスで表示されます。したがって、この分野の高い並行性設計は非常に重要です。

IV.結論

ここでは、この記事の重要なポイントを一緒に要約しましょう。

  • 最初に、Kafkaクライアントメッセージの送信バッチの利点を整理しましょう。
  • 現実のシナリオの類推を通じて、記録的な蓄積者の内部構造を理解し、クライアント側でメッセージがどのようにキャッシュされるかと内部コンポーネントの実装原則を深く分析できます。
  • BufferPoolとCopyOnWriteMapの実装原則を深く分析するには、KAFKAクライアントにとって非常に重要です。

<<:  三位一体: クラウドネイティブ、DevOps、プラットフォームエンジニアリング

>>:  Vue の仮想 Dom 技術を学習しましたか?

推薦する

ウェブサイトの外部リンクを増やすために百度百科事典のエントリを作成するためのヒント

すべてのウェブマスターは、Baidu が検索のランキング付けを行う際に、Baidu Encyclop...

新しいトップレベルドメイン: インターネットの巨人たちの饗宴

関連記事: ICANN が Apple、Google などを含む新しいトップレベル ドメイン申請のリ...

ライブストリーミング販売がWeChat Momentsを席巻

9月10日、葛世三は「8:15」WeChatビデオ生放送イベントで観客と「セックスレス結婚」について...

VMware のイノベーションでマルチクラウド製品の機能を再構築

2022年、クラウドコンピューティング業界の発展は転換期を迎えました。過去 3 年間、世界的な流行と...

ウェブサイトの最適化はランキングのためですか、それともユーザーエクスペリエンスのためですか?

ウェブサイトの最適化とは何でしょうか。百科事典では非常にわかりやすく説明されていると思います。しかし...

SEO初心者必読:SEOとホテルについてお話しましょう

今年の初めに、代理店のようなチケット販売のeコマースサイトを作成しました。 1年以上前から計画してい...

モバイル検索の今後の傾向は何でしょうか?

スマートフォンの急速な発展に伴い、モバイル検索の人気が高まっています。 CNNICが実施した第30回...

reprisehosting-27USD/L5520/500GB ハードディスク/50M 無制限トラフィック/2IP/シアトル

reprisehosting のシアトル (米国西海岸、国内速度が優れている) データ センターでは...

SEO は死んでおり、一部の SEO 手法は時代遅れになっています。

先ほど A5 で見た SEO は死んだという記事に関して、私はそうは思いません。私の5つのステーショ...

ウェブマスターネットワークニュース:Qvodは著作権侵害の疑いで巨額の罰金を科されたと報じられている。Qvodは泣き叫ぶだろう

1. Qvodは著作権侵害の疑いで巨額の罰金を科されたとの噂:Yunfanの捜索は反駁できない証拠公...

catalysthost-7 ブレード/1g メモリ/KVM/3 コア/60g ハードディスク/3T トラフィック/G ポート

Catalysthost のプロモーションは、年に 1 回しか行われないブラック フライデーなので、...

電子商取引の進化とは?小さくて美しいことを主張することが力の源

電子商取引の進化。今日の電子商取引は、Suning、JD.com、Dangdang、または他の一連の...

検索エンジンは画像を認識しません。Web デザインはシンプルであるほど良いです。

SEO の基本: 検索エンジンは画像を認識しません。Web デザインはシンプルであるほど良いです。 ...

追手が迫る中、百度はこの危機にどう対処するのだろうか。

「テクノロジーの世界には永遠の王は存在しない。」この言葉はインターネットにも同様に当てはまります。か...

白城旅行網のCEO、曽宋氏:インターネットの利点は

白城旅行網のオフィスに入ると、ロビーに華源旅行社(以下、「華源」)と「白城網」の2つの看板が掲げられ...