Kafka の高パフォーマンスの原理を理解するのに役立つ 6 つの技術的ポイント

Kafka の高パフォーマンスの原理を理解するのに役立つ 6 つの技術的ポイント

みなさんこんにちは。ジュン兄です。

Kafka は、1 秒あたり数千万件のメッセージを処理できる優れたパフォーマンスを備えたメッセージ キューです。今日は、Kafka の高パフォーマンスの背後にある技術的な原理についてお話ししましょう。

1. 一括送信

Kafka はメッセージをバッチで処理します。 Kafka プロデューサーがメッセージを送信するためのコードを見てみましょう。

 private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { //省略前面代码Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); //把消息追加到之前缓存的这一批消息上RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); //积累到设置的缓存大小,则发送出去if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch /**省略catch 代码*/ }

コードからわかるように、プロデューサーは doSend メソッドを呼び出した後、メッセージを直接送信するのではなく、メッセージをキャッシュします。キャッシュされたメッセージの量が設定されたバッチ サイズに達するまで、メッセージは送信されません。

注:上記の accumulator.append コードから、メッセージのバッチが同じトピックの同じパーティションに属していることがわかります。

ブローカーはメッセージを受信した後、バッチ メッセージを単一のメッセージに解析してディスクに書き込みません。代わりに、バッチ メッセージとしてディスクに書き込み、バッチ メッセージを他のレプリカに直接同期します。

コンシューマーがメッセージをプルする場合、メッセージを個別にプルするのではなく、バッチでプルします。一連のメッセージをプルした後、それらは消費のために単一のメッセージに解析されます。

メッセージのバッチ送受信を使用すると、クライアントとブローカー間のやり取りの回数が減り、ブローカーの処理能力が向上します。

2. メッセージの圧縮

メッセージ本文が大きい場合、Kafka メッセージのスループットは数千万に達する必要があり、ネットワーク カードでサポートされるネットワーク伝送帯域幅がボトルネックになります。 Kafka のソリューションはメッセージの圧縮です。メッセージを送信するときに、パラメータ compression.type を追加すると、メッセージの圧縮を有効にすることができます。

 public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //开启消息压缩props.put("compression.type", "gzip"); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("my_topic", "key1", "value1"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { logger.error("sending message error: ", e); } else { logger.info("sending message successful, Offset: ", metadata.offset()); } } }); producer.close(); }

compression.type の値が none に設定されている場合、圧縮は有効になりません。メッセージはいつ圧縮されますか?前述したように、プロデューサーはメッセージを送信する前に一連のメッセージをキャッシュし、送信する前に圧縮します。コードは次のとおりです。

 public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException { // ... try { // ... buffer = free.allocate(size, maxTimeToBlock); synchronized (dq) { //... RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) { // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often... return appendResult; } //这批消息缓存已满,这里进行压缩MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); } } finally { if (buffer != null) free.deallocate(buffer); appendsInProgress.decrementAndGet(); } }

上記の recordsBuilder メソッドは、最終的に次の MemoryRecordsBuilder コンストラクターを呼び出します。

 public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, long baseOffset, long logAppendTime, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { //省略其他代码this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); }

上記の wrapForOutput メソッドは、設定された圧縮アルゴリズムに従って圧縮するか、圧縮しないかを決定します。現在、Kafka は gzip、snappy、lz4 の圧縮アルゴリズムをサポートしています。バージョン 2.1.0 以降、Kafka は Zstandard アルゴリズムをサポートします。

ブローカー側では、検証を行うためにヘッダーが解凍されますが、メッセージ本文は解凍されません。メッセージ本体はコンシューマー側で解凍されます。コンシューマーがメッセージのバッチを取得した後、最初にそれらを解凍し、次にメッセージを処理します。

圧縮と解凍は CPU を集中的に使用する操作であるため、メッセージ圧縮を有効にする場合は、プロデューサーとコンシューマーの CPU リソースも考慮する必要があります。

メッセージのバッチ収集と圧縮により、Kafka プロデューサーがメッセージを送信するプロセスは次のようになります。

3. ディスクのシーケンシャル読み取りと書き込み

連続的な読み取りと書き込みにより、アドレス指定の時間が節約され、1 回のアドレス指定で連続的な読み取りと書き込みが可能になります。

SSD では、シーケンシャル読み取りおよび書き込みのパフォーマンスは、ランダム読み取りおよび書き込みのパフォーマンスよりも数倍高くなります。機械式ハードドライブでは、アドレス指定時に磁気ヘッドを移動する必要があり、この機械的な移動に多くの時間がかかります。したがって、機械式ハードドライブのシーケンシャル読み取りおよび書き込みパフォーマンスは、ランダム読み取りおよび書き込みの何十倍も高くなります。

メッセージ データを書き込むとき、Kafka のブローカーはまず各パーティションのファイルを作成し、次にファイルに対応するディスク領域にデータを順番に追加します。ファイルがいっぱいの場合は、データの追加を続行するために新しいファイルを作成します。これにより、アドレス指定時間が大幅に短縮され、読み取りおよび書き込みのパフォーマンスが向上します。

4. ページキャッシュ

Linux システムでは、すべてのファイル IO 操作は、ディスク ファイル用にメモリ内に作成されるキャッシュである PageCache を経由する必要があります。アプリケーションがファイルを読み書きする場合、ディスク上のファイルを直接読み書きするのではなく、PageCache を操作します。

アプリケーションがファイルを書き込む場合、最初にデータが PageCache に書き込まれ、その後、オペレーティング システムが PageCache データを定期的にディスクに書き込みます。以下のように表示されます。

アプリケーションがファイル データを読み取るとき、まずそのデータが PageCache 内にあるかどうかを判断します。そうであれば、データを直接読み取ります。そうでない場合は、ディスクを読み取り、データを PageCache にキャッシュします。

Kafka は PageCache の利点を最大限に活用します。プロデューサーがメッセージを生成する速度がコンシューマーがメッセージを消費する速度と同程度の場合、Kafka は基本的にメッセージをディスクに書き込まずにメッセージの送信を完了できます。

5. ゼロコピー

Kafka Broker がコンシューマーにメッセージを送信する場合、PageCache にヒットしたとしても、まず PageCache 内のデータをアプリケーションのメモリ領域にコピーし、次にアプリケーションのメモリ領域からソケット バッファ領域にコピーしてから、データを送信する必要があります。以下のように表示されます。

Kafka はゼロコピー技術を使用して、データを PageCache からソケット バッファに直接コピーするため、データをユーザー状態メモリ領域にコピーする必要がありません。同時に、DMA コントローラは CPU の関与を必要とせずにデータのコピーを直接完了します。以下のように表示されます。

Java ゼロコピー テクノロジーは、最下層の sendfile メソッドを呼び出す FileChannel.transferTo() メソッドを使用します。

6. mmap

Kafka のログ ファイルは、データ ファイル (.log) とインデックス ファイル (.index) に分かれています。インデックス ファイルの読み取りパフォーマンスを向上させるために、Kafka はインデックス ファイルに mmap メモリ マッピングを使用し、インデックス ファイルをプロセスのメモリ領域にマッピングします。これにより、インデックス ファイルの読み取り時にディスクから読み取る必要がなくなります。以下のように表示されます。

7. まとめ

この記事では、Kafka が高パフォーマンスを実現するために使用する主要なテクノロジーを紹介します。これらの技術は私たちの研究と仕事の参考になります。

<<:  Traefik Proxy v3.0 の画期的な機能を 1 つの記事で理解する

>>:  Kubernetesネットワークモデルの包括的なガイドについてお話ししましょう

推薦する

RivenCloud: 50% オフ、日本 VPS、大阪/東京、Netflix

Riven Cloudは、米国と香港に登録された新しい会社です。主にVPS事業を展開しており、日本の...

IDCの最新データベースレポート:Alibaba Cloudが市場シェアで1位となり、初めて従来のデータベースを上回る

最近、世界的に有名な市場調査機関であるIDCは、2019年下半期の中国リレーショナルデータベース市場...

VMware の Ye Yujian: 製品を統合して完全な SASE ソリューションを構築し、企業のリモート ワークを実現

[51CTO.comからのオリジナル記事]ポストエピデミック時代において、リモートワークは新たな常態...

VULTRはどうですか?カナダのクラウドサーバー(AMDプラットフォーム)の簡単なレビュー

Vultr は米国だけでなくカナダにも複数のデータセンターを持ち、カナダのトロントのデータセンターで...

Duomao Interactiveはインターネットの光を照らし、企業の効率的なマーケティングを支援します

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています今日、イン...

ゲーマーはTencentの広告とToutiaoの広告についてどう思っているのでしょうか?

Alibaba や Baidu とは異なり、Tencent Advertising と Toutia...

ネットワークマーケティングトレーニングの重要性と価値

近年のインターネットの急速な発展は、私たちの生活様式を変え、多くの企業のビジネスモデルを変え、多くの...

5年後のクラウドネイティブはどのようになっているでしょうか?

クラウド ネイティブが 2013 年に Pivotal によって初めて言及されてから 10 年が経ち...

すべてにインターネット思考が必要:「Roujiamo」からマイクロマーケティングを学ぶ

最近、「宇宙の中心」北京五道口に「西少葉」肉家墨という大人気の店が出現した。オープン初日に1,200...

Ancestry.com が売却を検討中:情報筋

北京時間6月6日、外国メディアの報道によると、事情に詳しい情報筋は、米国の系図ウェブサイト「Ance...

百度は360検索が「一線を越えた」かどうかを法律専門家に問う

奇虎360会長 周紅一周紅一氏は、360検索の成長は予想を上回ったと述べた。検索結果は捕捉され、百度...

nixcom-512m メモリ KVM/30g ハードディスク/1T トラフィック/フロリダ/月額 5 ドル

2007 年に設立されたと主張する nixcom は、米国とカナダのデータ センターで VPS (o...

情報ウェブサイトがリンク切れのリスクを回避する方法の簡単な分析

情報ウェブサイトにはコンテンツが多すぎ、特に多くのニュースを収集する必要があるため、ウェブサイト上の...

Linodeはどうですか?西海岸のロサンゼルスデータセンターのクラウドサーバーの簡単なレビュー

Linode はこれまで、米国西海岸に 1 つの FMT データセンターしか持っていませんでした。A...

クラウド コンピューティングはどのような方法でデジタル変革をサポートしますか?

過去 2 年間、デジタル変革に関して傍観者だった多くの企業は、難しい決断を迫られてきました。今こそ、...