Kafka: ビッグデータについて知っておくべきこと

Kafka: ビッグデータについて知っておくべきこと

1. Kafka の概要

Kafka はもともと LinkedIn によって、ZooKeeper 調整に基づくマルチパーティション、マルチレプリカの分散メッセージング システムとして Scala 言語を使用して開発されました。現在、Apache Foundation に寄贈されています。現在、Kafka は分散ストリーム処理プラットフォームとして位置付けられています。高いスループット、永続性、水平スケーラビリティ、ストリーム データ処理のサポートなどの機能により、広く使用されています。主に Scala と Java で書かれています。

これは、イベント ストリーム データを処理できる高スループットの分散パブリッシュ/サブスクライブ メッセージング システムです。 Kafka を使用すると、メッセージをサブスクライブしたい受信者に、公開したいメッセージを簡単に配信できます。アップストリーム プロデューサーは、指定された Kafka のトピックにメッセージを入力するだけでよく、ダウンストリーム レシーバーはトピックをサブスクライブするだけで、低レイテンシかつ高スループットでアップストリーム メッセージを受信できます。 Kafka は、同じトピックが複数の下流コンシューマーによって同時に消費されることもサポートしており、異なるコンシューマーのデータ処理の進行が互いに干渉することはありません。

  • トピックの場合、各パーティションは、一度に同じコンシューマー グループ内の 1 つのコンシューマーによってのみ消費されます。
  • AMQ と比較すると、より軽量です。非侵襲的で、依存関係が非常に少なく、リソースをほとんど占有せず、展開が容易で、依存関係が多すぎず、使いやすいです。

現在、Cloudera、Storm、Spark、Flink などのオープンソースの分散処理システムが Kafka との統合をサポートするようになっています。 Kafka がますます人気を集めている理由は、それが果たしている 3 つの主要な役割と切り離せません。

  • メッセージング システム: Kafka と従来のメッセージング システム (メッセージング ミドルウェアとも呼ばれます) はどちらも、システム分離、冗長ストレージ、トラフィック ピークの削減、バッファリング、非同期通信、スケーラビリティ、回復可能性などの機能を備えています。同時に、Kafka は、ほとんどのメッセージング システムでは実現が難しいメッセージ順序の保証とバックトラッキング消費機能も提供します。
  • ストレージ システム: Kafka はメッセージをディスクに保存するため、他のメモリベースのストレージ システムと比較してデータ損失のリスクが効果的に軽減されます。 Kafka のメッセージ永続化機能とマルチコピーメカニズムのおかげで、Kafka を長期データストレージシステムとして使用できます。対応するデータ保持ポリシーを「永続的」に設定するか、トピックのログ圧縮機能を有効にするだけです。
  • ストリーム処理プラットフォーム: Kafka は、一般的なストリーム処理フレームワークごとに信頼性の高いデータ ソースを提供するだけでなく、ウィンドウ、結合、変換、集計などの操作を含む完全なストリーム処理ライブラリも提供します。

2. Kafka はどのような問題を解決しますか?

メッセージ キューは一般に、非同期処理、サービス分離、フロー制御を扱います。したがって、メッセージ キューの一種である Kafka は、これらの問題も解決します。

3. Kafka の技術的特徴

  • 高いスループットと低いレイテンシ: Kafka は、最小レイテンシがわずか数ミリ秒で、1 秒あたり数十万件のメッセージを処理できます。各トピックは複数のパーティションに分割でき、コンシューマー グループはパーティションに対して並列消費操作を実行します。
  • スケーラビリティ: Kafka クラスターはホット拡張をサポート
  • 永続性と信頼性: メッセージはローカル ディスクに永続化され、データの損失を防ぐためにデータのバックアップがサポートされます。メッセージは消費後すぐに削除されるのではなく、有効期限が設定されます。
  • フォールトトレランス: クラスター内のノードが障害を起こしてもかまいません (レプリカの数が n の場合、n-1 個のノードが障害を起こしてもかまいません)
  • 高い同時実行性: 数千のクライアントの同時読み取りと書き込みをサポート
  • キューモード: すべてのコンシューマーが 1 つのキューに存在するため、メッセージはキュー内で分割され、並列に消費されます。
  • サブスクライブ・パブリッシュモード: すべてのコンシューマーがキューにいなくなるため、トピックメッセージはサブスクライブしているすべてのコンシューマーにブロードキャストできます。

4. Kafka の仕組み

(1)アーキテクチャ図

画像

プロデューサー:

メッセージプロデューサーは、メッセージを送信する側です。プロデューサーはメッセージを作成し、それを Kafka に配信する責任を負います。

消費者:

メッセージ コンシューマーは、メッセージを受信する側です。コンシューマーは Kafka に接続してメッセージを受信し、対応するビジネス ロジック処理を実行します。

消費者グループ(CG):

コンシューマー グループは複数のコンシューマーで構成されます。コンシューマー グループ内の各コンシューマーは、異なるパーティションからのデータを消費する責任を負います。パーティションは 1 つのグループ内のコンシューマーによってのみ消費されます。消費者グループは互いに影響を及ぼしません。すべてのコンシューマーはコンシューマー グループに属します。つまり、コンシューマー グループは論理的なサブスクライバーです。

ブローカ:

サービス プロキシ ノード。 Kafka の場合、Broker は単純に独立した Kafka サービス ノードまたは Kafka サービス インスタンスと見なすことができます。ほとんどの場合、サーバーにデプロイされている Kafka インスタンスが 1 つだけであれば、Broker は Kafka サーバーと見なすこともできます。 1 つ以上のブローカーが Kafka クラスターを形成します。一般的に、サービス プロキシ ノードを表すには、小文字の最初の文字のブローカーを使用するのが一般的です。

コントローラ:

クラスターには 1 つ以上のブローカーがあり、そのうちの 1 つがコントローラー (Kafka コントローラー) として選出され、クラスター全体のすべてのパーティションとレプリカのステータスを管理する役割を担います。

  1. パーティションのリーダー レプリカに障害が発生した場合、コントローラーはパーティションの新しいリーダー レプリカを選択する責任を負います。
  2. パーティションの ISR セットで変更が検出されると、コントローラはすべてのブローカーにメタデータ情報を更新するように通知する役割を担います。
  3. トピックのパーティション数が増加した場合でも、コントローラーはパーティションの再割り当てを担当します。

Kafkaにはトピックとパーティションという2つの特に重要な概念があります。

トピック:

これはキューとして理解でき、キューの両端にはプロデューサーとコンシューマーがあり、一方がデータを出力し、もう一方がデータを消費し、両方とも同じトピックに向けられています。

パーティション:

スケーラビリティを実現するために、非常に大量のデータを持つトピックを複数のブローカー (サーバー) に分散することができます。トピックは複数のパーティションに分割でき、各パーティションは順序付けられたキューです。トピックの同時実行性は基本的にパーティションの数と等しくなります。

Kafka 内のメッセージはトピック別に分類されます。プロデューサーは特定のトピックにメッセージを送信する責任があり (Kafka クラスターに送信される各メッセージはトピックを指定する必要があります)、コンシューマーはトピックをサブスクライブして消費する責任があります。

トピックは論理的な概念であり、さらに複数のパーティションに分割できます。パーティションは 1 つのトピックにのみ属し、パーティションはトピック パーティションと呼ばれることがよくあります。同じトピックの下にある異なるパーティションには、異なるメッセージが含まれます。ストレージ レベルでは、パーティションは追加可能なログ ファイルと見なすことができます。メッセージがパーティション ログ ファイルに追加されると、特定のオフセットが割り当てられます。

オフセットは、パーティション内のメッセージの一意の識別子です。 Kafka はこれを使用して、パーティション内のメッセージの順序を確保します。ただし、オフセットはパーティションにまたがりません。つまり、Kafka はトピックではなくパーティションの順序を保証します。

画像

上図に示すように、トピックには 4 つのパーティションがあり、各パーティションのログ ファイルの末尾にメッセージが順番に追加されます。 Kafka のパーティションは、異なるサーバー (ブローカー) に分散できます。つまり、トピックは複数のブローカーにまたがって、単一のブローカーよりも強力なパフォーマンスを提供できます。

各メッセージがブローカーに送信される前に、パーティション分割ルールに基づいて特定のパーティションが保存用に選択されます。パーティション分割ルールが適切に設定されていれば、すべてのメッセージを異なるパーティションに均等に分散できます。トピックが 1 つのファイルのみに対応する場合、ファイルが配置されているマシンの I/O がトピックのパフォーマンスのボトルネックになります。パーティショニングによりこの問題は解決されます。トピックを作成するときに、パラメータを指定してパーティションの数を設定できます。もちろん、トピックの作成後にパーティションの数を変更することもできます。パーティションの数を増やすことで水平方向の拡張が可能になります。

レプリカ:

Kafka はパーティションのマルチコピー (レプリカ) メカニズムを導入し、レプリカの数を増やすことで災害復旧機能を向上させることができます。

同じメッセージは同じパーティションの異なるレプリカに保存されます (同時に、レプリカはまったく同じではありません)。レプリカは「1 つのマスターと複数のスレーブ」の関係にあり、リーダー レプリカは読み取りおよび書き込み要求の処理を担当し、フォロワー レプリカはリーダー レプリカとのメッセージの同期のみを担当します。レプリカは異なるブローカーにあります。リーダー レプリカに障害が発生すると、外部サービスを提供するためにフォロワー レプリカから新しいリーダー レプリカが再選出されます。 Kafka は、マルチコピー メカニズムを通じて自動障害転送を実装し、Kafka クラスター内のブローカーに障害が発生した場合でもサービスの可用性を確保できます。

画像

上図に示すように、Kafka クラスターには 4 つのブローカーがあり、トピックには 3 つのパーティションがあり、レプリケーション係数 (つまり、コピー数) も 3 であるため、各パーティションにはリーダー コピーが 1 つとフォロワー コピーが 2 つあります。プロデューサーとコンシューマーはリーダー コピーとのみ対話し、フォロワー コピーはメッセージの同期のみを担当します。多くの場合、フォロワー コピーのメッセージはリーダー コピーより遅れます。

Kafka コンシューマー側にも、特定の災害復旧機能があります。コンシューマーはプル モードを使用してサーバーからメッセージをプルし、消費の特定の場所を保存します。クラッシュ後にコンシューマーが復元されると、以前に保存された消費場所に基づいて消費に必要なメッセージを再度取得できるため、メッセージが失われることはありません。

(2)カフカの執筆過程

  1. zk クラスターに接続し、対応するトピックのパーティション情報とパーティション リーダーの関連情報を zk から取得します。
  2. 対応するブローカーにメッセージを送信する
  3. メッセージを送信する場合、クライアントはメッセージのトピックと値を指定する必要があります。さらに、メッセージのパーティションとキーを指定することもできます。
  4. メッセージをシリアル化する
  5. メッセージ レコードでパーティションが指定されている場合、Partitioner は何も行いません。それ以外の場合、パーティショナーはメッセージのキーに基づいてパーティションを取得します。これにより、プロデューサーはどのトピックのどのパーティションにメッセージを送信するかを知ることができます。
  6. メッセージは対応するバッチに追加され、独立したスレッドがこれらのバッチをブローカーに送信します (メッセージはブローカーに 1 つずつ送信されるのではなく、メッセージのバッチが送信される前にクライアント上でローカルにキャッシュされることに注意してください。したがって、クライアントはメッセージをバッチで送信します。つまり、バッチには 1 つ以上のメッセージが含まれます。同様に、ブローカーもデータをバッチで保存しますが、これについては後で説明します)。
  7. ブローカーはメッセージを受信し、応答を返します。メッセージが Kafka に正常に書き込まれると、パーティション内のメッセージのトピック情報、パターン情報、オフセット情報を含む成功メッセージが返されます。失敗した場合はエラーが返されます。

(3)カフカの読み方

  1. zk クラスターに接続し、対応するトピックのパーティション情報とパーティションのリーダーを zk から取得します。
  2. 対応するリーダーの対応するブローカーに接続する
  3. コンシューマーは、読み取りたいトピック、パーティション、および対応するオフセットをリクエストを通じてリーダーに送信します。
  4. リーダーはオフセットなどの情報に基づいてセグメント(インデックスファイルとログファイル)を見つけます。
  5. インデックスファイルの内容に従って、ログファイル内のオフセットに対応する開始位置を見つけ、対応する長さのデータを読み取り、コンシューマーに返します。

5. Kafkaデータ構造の説明

(1)ZookeeperにおけるKafka登録データ構造

Kafka は Zookeeper を使用して一部のメタ情報を保存し、Zookeeper 監視メカニズムを使用してメタ情報の変更を検出し、対応するアクション (コンシューマー障害、負荷分散のトリガーなど) を実行します。

Kafka クラスターの登録情報:永続ノード、クラスター ID。最初のブローカーが起動し、/cluster/id が存在しないことが判明すると、その cluster.id 構成が zk に書き込まれます。現在の zk がどのクラスターに属しているかをマークします。他のブローカーは起動時にデータを取得します。データが構成と一致していないことが判明した場合、例外がスローされ、同じクラスターに参加できなくなります。

データノード: /cluster/id、データサンプル:

 {
「バージョン」 : 「1」
「id」 : 「0」
}

ブローカーノード登録:一時ノード。 Kafka ブローカーが起動されると、まずそのノード情報 (一時 znode) が Zookeeper に登録されます。同時に、ブローカーと Zookeeper が切断されると、この znode も削除されます。

各ブローカーの構成ファイルでは、数値 ID (グローバルに繰り返し不可) を指定する必要があり、znode 値にはブローカーのホスト、ポート、セキュリティ構成などの情報が含まれます。

ブローカーがクラッシュするか、そのパーティションに障害が発生すると、Zookeeper はパーティション ノードのミューテックスを解放し、他のブローカーがこのパーティションのリーダーを再選出してステータス情報を更新します。

データノード: **/brokers/ids/[0...N]** → host:port [0..N]はブローカーIDを表します。

トピック登録データ:一時的な znode。ブローカーが起動すると、ブローカーは独自のトピックとパーティション情報を Zookeeper に登録します。 znode 値には、トピックのパーティション情報、ISR 情報などが含まれます。

データ ノード: /brokers/topics/[topic]/[0...N]。ここで、[0..N] はパーティション インデックス番号を表します。

トピック パーティション ステータス情報ノード:リーダーが誰であるか、どのサーバーが使用可能であるかを記録します。

データノード: /brokers/topics/[topic]/partitions/[0…N]。

補助選択情報ノード (一時ノード) は、現在のコントローラー ロールの BrokerId を記録します。ノードを削除すると、すぐに再選挙がトリガーされます。

データノード: /controller。

データサンプル:

 [ zk : localhost : 2181 ( 接続済み) 1 ] 取得/ コントローラー
{ "バージョン" : 1"ブローカーID" : 1001"タイムスタンプ" : "1653989876415" }
cZxid = 0x100000029
ctime = 2022年5月31日火曜日17:37:56 CST
mZxid = 0x100000029
mtime = 2022年5 31 火曜日17:37:56 CST
pZxid = 0x100000029
変換= 0
データバージョン= 0
aclバージョン= 0
一時所有者= 0x3811979ba7c0001
データ長= 57
子の数= 0

選挙管理ノード:選挙回数を記録します。 Kafka クラスターの最初のブローカーが初めて起動された場合、番号は 1 になります。将来、クラスター内の中央コントローラーが配置されているブローカーが変更されたり、ハングアップしたりしても、新しい中央コントローラーが再選出されます。センターコントローラーが変更されるたびに、controller_epoch値は + 1 になります。

データノード: /controller_epoch。

データサンプル:

ISR 変更通知ノード: ブローカー上の LogDir (ログが配置されているディレクトリ) に例外が発生した場合 (ディスクの破損、ファイルの読み取りおよび書き込みの失敗など)、または ISR にその他の変更が発生した場合 (トピックへのパーティションの追加など)。子ノード **/log_dir_event_notification/**log_dir_event_sequence 番号が zk に追加されます。コントローラーはこのノードの変更を監視した後、ブローカーに LeaderAndIsrRequest リクエストを送信します。その後、オフライン レプリカに対していくつかのフォローアップ操作を実行します。

  1. 破損した LogDir が配置されているノードにレプリカ リーダーが存在する場合は、新しい選出が実行されます。
  2. 破損した LogDir が配置されているノードのレプリカは、他の正常なブローカーに再配布されます。

データ ノード: /isr_change_notification。

(2)Kafkaトピックデータ構造

Kafka メッセージはトピックに基づいており、トピックは互いに独立しています。 Event_topic を例に挙げます。 10 個のパーティションで構成されています。パーティションの数は、トピックの作成時に指定することも、トピックの作成後に変更することもできます。ただし、トピックのパーティションの数を増やすことはできますが、減らすことはできません。各パーティションには 1 つ以上のレプリカを含めることができます。

ストレージ構造上のパーティションの各コピーは、ログ オブジェクトに対応します。各ログは、サイズとタイムスタンプに応じて複数の LogSegments に分割されます。各 LogSegment には、ログ ファイルと 2 つのインデックス ファイルが含まれます (図を参照)。 2 つのインデックス ファイルは、オフセット インデックス ファイルとタイムスタンプ インデックス ファイルです。

Kafka データ ディレクトリ: **/sensorsdata/seqdata00/kafka/data**。

データ ディレクトリには 5 つのファイルがあります。

  • 000000000000000000100.log
  • 000000000000000000100.インデックス
  • 000000000000000000100.タイムインデックス
  • 00000000000000000100.スナップショット
  • リーダーエポックチェックポイント

注意: インデックス、ログ、およびタイムインデックス ファイルはバイナリ テキストです。 kafka ツールを使用して内容を表示できます。チェックポイント ファイルは直接開くことができます。 ****ここでは、Kafka のログ ファイル ストレージとオフセット インデックスに焦点を当てます。

ログファイル 000000000000000000100.log

データ ファイルはメッセージを保存するために使用されます。各メッセージは、固定長のメッセージ ヘッダーと可変長のメッセージ本体で構成されます。ファイル サイズは、log.segment.bytes パラメータで設定できます。

前述のように、Broker はデータをバッチで書き込むため、ログ ファイルにはデータがバッチで保存されます。バッチには 1 つ以上のメッセージを含めることができます。バッチ形式は次のとおりです。

  • 開始オフセット: 8 バイトを占め、現在のバッチの最初のメッセージのオフセット、つまりバッチの最初のメッセージとファイル全体でのその番号を格納します。
  • 位置: ファイル内の現在のバッチの最初のメッセージのバイナリ開始位置。
  • 長さ: 4 バイトを占め、バッチ全体が占めるディスク領域のサイズを格納します。このフィールドを通じて、Kafka はメッセージをトラバースするときに、データ読み取りのために次のバッチにすばやくジャンプできます。
  • パーティション リーダー バージョン番号: 現在のメッセージが配置されているパーティションのリーダーのサーバー バージョンを記録します。主に一部のデータ バージョンの検証と変換作業に使用されます。
  • CRC: 現在のデータ バッチ全体の CRC チェック コードは、主にデータのエラー チェックに使用されます。
  • 属性: 2 バイトを占有します。このフィールドの下位 3 ビットは、現在のバッチ内のメッセージの圧縮方法を記録します。現在、主なタイプは GZIP、LZ4、Snappy の 3 つです。 4 番目のビットはタイムスタンプのタイプを記録し、5 番目と 6 番目のビットは新しいバージョンで導入されたトランザクション タイプと制御タイプを記録します。
  • 最大変位増分: 最初のメッセージに対する最新メッセージの変位のみが増分となります。
  • 開始タイムスタンプ: 8 バイトを占め、バッチ内の最初のメッセージのタイムスタンプを記録します。
  • 最大タイムスタンプ: 8 バイトを占め、バッチ内の最新メッセージのタイムスタンプを記録します。
  • PID、プロデューサー エポック、開始シーケンス番号: これら 3 つのパラメーターは、主にトランザクションと冪等性を実装するために使用されます。 PID とプロデューサー エポックは、現在のプロデューサーが正当かどうかを判断するために使用され、開始シーケンス番号は主にメッセージの冪等性検証を実行するために使用されます。
  • メッセージ数: 4 バイトを占め、現在のバッチ内のすべてのメッセージの数を記録します。

より詳細な手順については、公式 Web サイトをご覧ください: https://kafka.apache.org/21/documentation.html#recordbatch。

データサンプル:

 [ root @hybrid03 event_topic - 5 ] # sh kafka / bin / kafka - run - class . sh カフカツールDumpLogSegments -- ファイル00000000000000000100.l og
000000000000000000100.log をダンプしています
開始オフセット: 100
baseOffset : 100 lastOffset : 100 count : 1 baseSequence : - 1 lastSequence : - 1 producerId : - 1 producerEpoch : - 1 partitionLeaderEpoch : 0 isTransactional : false position : 0 CreateTime : 1654687438348 isvalid : true size : 711 magic : 2 compressedcodec : GZIP crc : 502062677
baseOffset : 101 lastOffset : 102 count : 2 baseSequence : - 1 lastSequence : - 1 producerId : - 1 producerEpoch : - 1 partitionLeaderEpoch : 0 isTransactional : false position : 711 CreateTime : 1654687446472 isvalid : true size : 866 magic : 2 compressedcodec : GZIP crc : 2103664298
baseOffset : 103 lastOffset : 103 count : 1 baseSequence : - 1 lastSequence : - 1 producerId : - 1 producerEpoch : - 1 partitionLeaderEpoch : 0 isTransactional : false position : 1577 CreateTime : 1654742637828 isvalid : true size : 779 magic : 2 compressedcodec : GZIP crc : 2848436952
baseOffset : 104 lastOffset : 104 count : 1 baseSequence : - 1 lastSequence : - 1 producerId : - 1 producerEpoch : - 1 partitionLeaderEpoch : 0 isTransactional : false position : 2356 CreateTime : 1654745849290 isvalid : true size : 710 magic : 2 compressedcodec : GZIP crc : 103403051
baseOffset : 105 lastOffset : 111 count : 7 baseSequence : - 1 lastSequence : - 1 producerId : - 1 producerEpoch : - 1 partitionLeaderEpoch : 0 isTransactional : false position : 3066 CreateTime : 1654745852374 isvalid : true size : 1060 magic : 2 compressedcodec : GZIP crc : 4064619696

さらに詳しい情報:各メッセージ レコードの内部構造: https://kafka.apache.org/21/documentation.html#record。

インデックスファイル: 000000000000000000100.index

Kafka には、主にオフセット インデックス ファイルとタイムスタンプ インデックス ファイルの 2 種類のインデックス ファイルがあります。

  • 変位インデックス ファイルには、メッセージの変位と、変位に対応するメッセージの物理アドレスが格納されます。
  • タイムスタンプ インデックス ファイルには、メッセージのタイムスタンプとメッセージの変位値が格納されます。

この記事では位置インデックスについてのみ説明します。検索効率を向上させるために、Kafka は各データ ファイルに対してオフセットベースのインデックス ファイルを作成します。データファイルは同じ名前を持ち、サフィックスは.indexです変位インデックス ファイルに関して注意すべき点が 3 つあります。

  • Kafka メッセージはバッチで保存されるため、インデックス ファイル内のインデックス要素の最小単位はバッチになります。つまり、インデックス ファイルをシフトすることで、メッセージが配置されているバッチを見つけることができますが、バッチ内のメッセージの具体的な位置を見つけることはできません。メッセージを検索する場合、バッチをさらに走査する必要があります。
  • インデックス ファイルはインデックスを保存するために使用されます。インデックスは、データ ファイル内のメッセージの物理的な場所にオフセットをマップするために使用されます。各インデックス エントリは、オフセット (メッセージの番号) と位置 (メッセージの物理的な場所) で構成されます。各インデックス エントリは、データ ファイル内のメッセージを一意に識別します。インデックス エントリのオフセットと位置は、データ ファイル内のメッセージのオフセットと位置と 1 対 1 で対応します。たとえば、データ ファイル内のメッセージのオフセットが 114、位置が 4126 の場合、そのメッセージに対してインデックスが作成されると、インデックス ファイル内のインデックス値はオフセットが 114、位置が 4126 になります
  • すべてのメッセージにインデックスがあるわけではありません。 Kafka はスパース ストレージ方式を使用して、一定数のデータ バイトごとにインデックスを作成します。インデックスの範囲はindex.interval.bytesを通じて設定できます。
 [ root @hybrid03 event_topic - 5 ] # sh kafka / bin / kafka - run - class . sh カフカツールDumpLogSegments -- ファイル00000000000000000100。 インデックス
000000000000000000100 をダンプしますインデックス
オフセット: 114 位置: 4126 「説明: 114 番目のメッセージは、ファイル4126 番目の物理位置から読み取る必要があります」
オフセット: 140 位置: 8254
オフセット: 178 位置: 13332
オフセット: 232 位置: 18278
オフセット: 286 位置: 23340 「ここから、センサーが使用するスパース係数は26 であることがわかります

6. Kafka の運用と保守

Kafka コマンドライン ツールのパス: xxx/kafka/bin/。

(1)トピック管理指示

トピックの作成、削除、パーティションの拡張、トピックの詳細の照会、トピック リストの表示など、トピックを管理できます。

コマンドツール: kafka-topics.sh。

使用されている Kafka バージョンが 2.11 の場合、Kafka バージョン >= 2.2 は --bootstrap-server パラメータをサポートし、その他のバージョンでは --zookeeper のみを使用できます。

 # トピックを作成する:
kafka - トピックsh -- 作成-- zookeeper localhost : 2181 -- レプリケーション- 係数3 -- パーティション3 -- トピックテスト
# トピックパーティションの拡張
kafka - トピックsh -- zookeeper localhost : 2181 -- alter -- topic test -- パーティション4
# トピックを削除:
kafka - トピックsh -- 削除-- zookeeper ローカルホスト: 2181 ローカルホスト: 9092 -- トピックテスト
#クエリトピックの詳細
[ DEV ( v.v ) sa_cluster @hybrid03bin ] $ ./kafka - topics . sh -- トピックevent_topic -- zookeeper ローカルホスト: 2181 -- 説明
トピック: event_topic パーティション数: 10 レプリケーション係数: 2 構成: 圧縮タイプ= gzip
トピック: event_topic パーティション: 0 リーダー: 1001 レプリ​​カ: 1001、1003 Isr : 1001、1003
トピック: event_topic パーティション: 1 リーダー: 1003 レプリカ: 1003、1002 Isr : 1003、1002
トピック: event_topic パーティション: 2 リーダー: 1002 レプリカ: 1002、1001 Isr : 1002、1001
トピック: event_topic パーティション: 3 リーダー: 1001 レプリ​​カ: 1001、1002 Isr : 1001、1002
トピック: event_topic パーティション: 4 リーダー: 1003 レプリカ: 1003、1001 Isr : 1003、1001
トピック: event_topic パーティション: 5 リーダー: 1002 レプリカ: 1002、1003 Isr : 1002、1003
トピック: event_topic パーティション: 6 リーダー: 1001 レプリ​​カ: 1001、1003 Isr : 1001、1003
トピック: event_topic パーティション: 7 リーダー: 1003 レプリカ: 1003、1002 Isr : 1003、1002
トピック: event_topic パーティション: 8 リーダー: 1002 レプリカ: 1002、1001 Isr : 1002、1001
トピック: event_topic パーティション: 9 リーダー: 1001 レプリ​​カ: 1001、1002 Isr : 1001、1002
#すべてのトピックを一覧表示
kafka - トピックsh -- ブートストラップ- サーバーxxxxxx : 9092 -- リスト-- 除外- 内部

(2)ノード追加・削除後のデータバランス

データ ノードを追加した後、ブローカーは新しいノードで起動されますが、Kafka はデータを自動的にバランス調整しないため、手動で行う必要があります。

コマンド ツール: kafka-reassign-partitions.sh。

  1. 構成ファイル move-json-file.json を記述して、どのトピックを再パーティション化するかを Kafka に伝えます。
 {
「トピック」 : [{
「トピック」 : 「イベントトピック」
},
{
「トピック」 : 「プロファイルトピック」
},
{
「トピック」 : 「item_topic」
}
]、
「バージョン」 : 1
}

割り当て情報を生成するコマンドを実行します。この時点ではパーティションの移動はまだ開始されておらず、現在の割り当てと提案のみが通知されることに注意してください。ロールバックする場合に備えて、現在の割り当てを保存します。

 #以下-- broker - リストパラメータbrokerid 対応します
[ DEV ( v . v ) クラスター@hybrid03 bin ] $ ./kafka -reassign - partitions . sh -- zookeeper localhost : 2181 -- 移動するトピックjson ファイル~ / mv json -- ブローカー- リスト"1001,1002" -- 生成
現在のパーティションレプリカの割り当て#現在の割り当て情報
{ "version" : 1"partitions" :[{ "topic" : "event_topic""partition" : 2"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""partition" : 8"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""partition" : 3"replicas" :[ 10011002 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""パーティション" : 6"レプリカ" :[ 10011003 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 9"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "アイテムトピック""パーティション" : 0"レプリカ" :[ 10011003 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 0"レプリカ" :[ 10011003 ]、 "log_dirs" :[ "any" , "any" ]},{ "topic" : "event_topic""partition" : 5"replicas" :[ 10021003 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "profile_topic""partition" : 2"replicas" :[ 10011003 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "profile_topic""partition" : 1"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""パーティション" : 4"レプリカ" :[ 10031001 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 1"レプリカ" :[ 10031002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 7"レプリカ" :[ 10031002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "プロファイルトピック""パーティション" : 0"レプリカ" :[ 10031002 ]、 "log_dirs" :[ "任意" , "任意" ]}]}
提案されたパーティション再割り当て構成# 割り当て後の情報
{ "version" : 1"partitions" :[{ "topic" : "event_topic""partition" : 7"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "profile_topic""partition" : 1"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""partition" : 1"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "item_topic""パーティション" : 0"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 4"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 9"レプリカ" :[ 10021001 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 6"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "any" , "any" ]},{ "topic" : "event_topic""partition" : 3"replicas" :[ 10021001 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""partition" : 8"replicas" :[ 10011002 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "profile_topic""partition" : 0"replicas" :[ 10011002 ]、 "log_dirs" :[ "any""any" ]},{ "topic" : "event_topic""パーティション" : 0"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 5"レプリカ" :[ 10021001 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "プロファイルトピック""パーティション" : 2"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意""任意" ]},{ "トピック" : "イベントトピック""パーティション" : 2"レプリカ" :[ 10011002 ]、 "log_dirs" :[ "任意" , "任意" ]}]}

上記で取得した目的の再配布方法ファイルを json ファイル reassignment-json-file.json に保存し、パラメータ --execute を使用して割り当てを実行します。

 ./kafka - 再割り当て- パーティション. sh -- zookeeper LOCALHOST : 2181 -- 割り当て-json - file 割り当て-json - file  json -- 実行

このコマンドは、次のシナリオでも使用できます。

  • パーティションにレプリカを追加するには、手順 2 で生成されたコンテンツの replicas パラメータに、追加するレプリカのブローカー ID 情報を追加するだけです。実行すると、対応するブローカーにレプリカが自動的に作成されます。
  • パーティションを再割り当てします。

(3)摂取方法

グループの消費量を表示します。

 # グループ: グループID名を指定します
./kafka-consumer-groups .sh --bootstrap - サーバー127.0 .0 .1 : 9092 --describe --group テスト- グループ
#例
# TOPIC : グループに対応するトピック
# PARTITION: パーティション番号。0から始まり、0 - 5は6つのパーティションがあることを意味します。
# CURRENT - OFFSET: このコンシューマーが現在消費しているオフセット
# LOG - END - OFFSET: プロデューサーがこのパーティションで確認するために送信したオフセット
# LAG: 2 つのオフセットの差。バックログとも呼ばれます。この値は大きすぎると異常です。
# HOST: 消費者が配置されているサーバーのIPアドレス
# クライアント- ID: 消費者に関する情報
./kafka-consumer-groups .sh --bootstrap - server localhost : 9092 --describe -- group test - group

グループを削除する:

 ./kafka-consumer-groups .sh --bootstrap - サーバー127.0 .0 .1 : 9092 --delete --group テスト- グループ

消費者の置き換えをリセットする:

 最早戦略: 変位を現在の最早変位に調整する
bin / kafka- コンシューマーグループ SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- グループテスト- グループ- リセット- オフセット- すべて- トピック- 初期- 実行
最新の戦略:現在の最新の変位への変位を調整します
Bin / Kafka- 消費者- グループ sh -bootstrap- サーバーkafka- ホスト ポート- グループテスト- グループ- リセット- オフセット- すべて- トピック- 最新- 実行- 実行
現在の戦略:最新の提出された変位に変位を調整します
Bin / Kafka- 消費者- グループ SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- グループテスト- グループ- リセット- オフセット- すべて- トピック- current- 実行- 実行
指定- オフセット戦略:指定された変位への変位を調整します
Bin / Kafka- 消費者- グループ sh -bootstrap -server kafka- ホスト ポート- グループテスト- グループ- リセット- オフセット- すべて- トピック- オフセット<offset> - 実行
シフト- by -n戦略:現在の変位+ nnは負の値になる可能性があるへの変位を調整します
Bin / Kafka- 消費者- グループ SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- グループテスト- グループ- トピックテスト- リセット- オフセット- シフト- <offset_n> - 実行
DateTime戦略:(指定された時間よりも大きい最小変位への変位を調整します)
時間を8に短縮する必要があります
Bin / Kafka- 消費- グループ SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- グループテスト- グループ- トピックテスト- リセット- オフセット- TO -DATETIME 2019-06-20 T20 00 00.000- 実行
持続時間戦略:現在の時間から指定された間隔の変位への変位を調整し、変位を電流指定された時間間隔の変位に調整します。特定の形式はpndtnhnmnsです。
文字Pで始まり、それぞれd、h、m sの4つの部分で構成されています

Bin / Kafka- 消費者- グループ SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- グループテスト- グループ- リセット- オフセット- by 期間PT0H30M0S- 実行

(4)トピックの有効期限を設定します

  トピックの有効期限を設定する( ミリ秒単位)
### 3600000 ミリ秒= 1 時間
./bin/kafka -configs.sh -zookeeper 127.0 .0 .1 2181 -Alter -entity -name topy -elk -elk -log -hechuan -huanbao- エンティティ- タイプトピック- 追加- 構成定着.ms = 3600000
#view トピック構成
./bin/kafka- 構成 SH -Zookeeper 127.0 .0 .1 2181- 説明- エンティティ- 名前トピック- DevOps -Elk -Log -Hechuan -Huanbao- エンティティ- タイプトピック

(5)ツール関連

スクリプトを使用してメッセージを作成/消費します

 #connect to test- トピックを入力して、 +を入力してメッセージを作成します
$ bin / kafka- コンソール- プロデューサー SH- ブローカー- リストKafka- ホスト ポート- トピックテスト- トピック- プロデューサー- プロパティ
>
- from- 最初最初からメッセージを消費するように指定します。そうしないと、メッセージは最新の場所から消費されます
$ bin / kafka- コンソール- 消費者 SH -BOOTSTRAP -SERVER KAFKA- ホスト ポート- トピックテスト- トピック- グループテスト- グループ- 最初- 消費- プロパティ

Kafkaパフォーマンステスト

 #test プロデューサー
指定されたトピックに1,000万のメッセージが送信され メッセージサイズは1 kbです
#it ​​テストプロデューサーのスループットMB / s、メッセージの送信のメッセージ、およびさまざまな分位数でのレイテンシを印刷します
$ bin / kafka- プロデューサー- パフォーマンス- テスト SH- トピックテスト- トピック- num- 記録10000000- スループット-1- レコード- サイズ1024- プロデューサー- ブートストラップ小道具 Servers = Kafka -HOSTPORT ACKS = -1 LINGER MS = 2000 圧縮Type = LZ4
2175479 記録 435095.8 記録/ 424.90 MB / )、 131.1 ミリ秒評価681.0 ミリ秒の最大レイテンシ
4190124 レコードが送信され838024.8 レコード/ 818.38 MB / )、 4.4 ミリ秒評価73.0 ミリ秒の最大レイテンシ
10000000 レコードが送信され737463.126844 記録/ 720.18 MB / )、 31.81 ミリレイテンシ681.00 MS 最大レイテンシ4 ms 50 Th126 ms 95th6​​04 ms 99th 99th 672 ms 99.9t
#test 消費者のパフォーマンス
$ bin / kafka- 消費- パフォーマー- テストSH -BROKER -LIST KAFKA -HOST PORT- メッセージ10000000- トピックテスト- トピック
始める時間終わり時間データ消費MBMBSECデータ消費nmsgnmsgリバランス時間MSフェッチ時間MSフェッチMBsecフェッチnmsg
2019-06-26 15 24 18 138、2019-06-26 15 24 23 805、9765.6202、1723.2434、10000000、1764602.0822 16、5651、1728.1225、1769598.3012

7。カフカは一般的にパフォーマンスチューニングを使用しました

(1)ディスクディレクトリの最適化

Kafkaの読み取りと書き込みの単位はパーティションであるため、トピックを複数のパーティションに分割すると、スループットが改善されます。しかし、ここには前提条件があります。つまり、異なるディスクに配置する必要があります(同じマシンにある場合があります)。複数のパーティションが同じディスクにある場合、複数のプロセスがディスク上に複数のファイルを同時に読み取り、書き込み、オペレーティングシステムがディスクの読み取りと書き込みを頻繁にスケジュールすることを意味します。つまり、ディスクの読み取りと書き込みの連続性が破壊されます。

最適化パラメーター: l **** og.dirs =/data/seqdata00/kafka/data、/data/seqdata01/kafka/data、/data/seqdata02/kafka/data。

(2)JVMパラメーター構成

CMSではなく最新のG1をゴミコレクターとして使用することをお勧めします。 Javaに推奨される最小バージョンはJDK 1.7U51です。

CMSと比較したG1の利点:

  • G1は、サーバー側に適したゴミコレクターであり、スループットと応答性のバランスをとります。
  • メモリパーティション化のさまざまな方法、エデン、生存者、および古い領域はもはや固定されておらず、メモリの使用がより効率的になります。 G1は、メモリ領域を分割することにより、メモリの断片化を効果的に回避します。
  • G1は、GCを使用してスレッドを一時停止できる時間を指定できます(厳密なコンプライアンスは保証されません)。 CMSは制御可能なオプションを提供しません。
  • CMSはFullGC後にのみ圧縮メモリを再マザーしますが、G1はリサイクルを収集してマージします。
  • CMSは古い地域でのみ使用できます。 Parnewは通常、若い清掃と組み合わせて使用​​されますが、G1は2種類のパーティションのリサイクルアルゴリズムを統合できます。

G1の適用シナリオ:

  • JVMは多くのメモリを取り上げます(少なくとも4G)。
  • アプリケーション自体が頻繁にメモリを適用してリリースし、それによって大量のメモリフラグメントを生成する場合。
  • GC時間により敏感なアプリケーションの場合。

現在、JVMパラメーターを使用しています。

(3)ログデータフラッシング戦略

プロデューサーの書き込みスループットを大幅に改善するには、定期的にファイルをバッチに記述する必要があります。

構成する2つのパラメーターがあります。

  • log.flush.interval.messages = 100000:プロデューサーが100,000個のデータを書き込むときはいつでも、データをディスクに洗い流します。
  • log.flush.interval.ms = 1000:1秒ごとにディスクをフラッシュします。

(4)ログ保持時間

Kafkaサーバーに多数のメッセージが書き込まれると、多くのデータファイルが生成され、大量のディスクスペースが占有されます。時間内にクリーンアップされていない場合、ディスクスペースが不十分な場合があります。 Kafkaはデフォルトで7日間保持されます。

パラメーター: log.retention.hours = 168。

<<:  Emissary Ingress を OPA と統合する方法

>>:  CIO やその他の IT リーダーがエッジ コンピューティングを活用してビジネスを強化するための 4 つの鍵

推薦する

クラウドコンピューティングのコスト見積もりで無視できないいくつかの要素

パブリック クラウドは、オンプレミスのインフラストラクチャと比較して、企業のコストが削減される傾向が...

レスポンシブ レイアウトの利点と欠点、またそれを設計する方法は何ですか?

レスポンシブ レイアウトといえば、誰もがある程度理解していると思います。レスポンシブ レイアウトは、...

医療ウェブサイトのSEOの終焉が近づいている

百度は今回、医療系ウェブサイトを本気で取り締まるつもりだ。6月28日から対策を講じている。当時、我々...

Open NOS によるクラウドとデータセンターの再構成の習得

企業は、データセンターの運用コストを削減しながら、生産性、ビジネスの回復力、持続可能性を向上させるこ...

Baidu リアルタイム ホットスポットは新しいプロモーション方法でしょうか?

はじめに: Baidu が「トップ 10 リアルタイム ホットスポット」(Baidu ホームページの...

初心者ウェブマスターの3ヶ月目の最適化体験

1 か月前、私は A5 Webmaster Network に「初心者 Web マスターによる新規サ...

Baiduの重みにおけるTaoxie.comとPaixie.comのカタログの違いに関する構造的議論

今日は8月10日です。実は今朝からTaoxie.comとPaixie.comを見てきました。なぜなら...

新しいサイトのスナップショットとソリューションの組み込みに影響する要因の解釈

新しいウェブサイトがオンラインになるたびに、ウェブマスターはこのウェブサイトに非常に失望しています。...

企業ブランド構築の成否は細部によって決まる

インターネットでお金を稼ぐ秘密の方法はありません。中国人の本質は他人を真似ること、または率直に言えば...

小紅書を宣伝するための4つの戦略は必ず読んでください

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス現在、小紅書はますます企...

ソーシャルメディアによって盗まれた膨大なトラフィック

オンライン マーケティングを理解する人が増えるにつれ、Baidu はトラフィックを増やすのに適した場...

小売業界におけるパブリッククラウドの情報セキュリティに関する議論

⚠️パブ​​リッククラウドのセキュリティインシデント⚠️過去 6 か月間に、小売業界ではパブリック ...

Dell Technologies のハイブリッド クラウド自動化とデジタル化が主流に

過ぎ去ったばかりの2021年は、テクノロジーがこれまで以上に重要であることを間違いなく証明しました。...

HUAWEI CLOUDはイノベーションと開発を加速し、金融業界のインテリジェンス化を支援します

フィンテックは金融業界を変革しています。デジタル時代において、金融業界は変化をさらに迅速に受け入れる...

RSA イノベーション サンドボックス インベントリ | STRATA - 分散型マルチクラウド ID

RSA カンファレンス 2021 は、サンフランシスコ時間 5 月 17 日に開催されます。 RSA...