基本的な紹介Apache Kafka は、LinkedIn が Scala と Java で開発し、Apache Software Foundation に寄贈したオープンソースのストリーム処理ソフトウェア プラットフォームです。 このプロジェクトの目的は、リアルタイムのデータ ストリームを処理するための、統合された高スループット、低レイテンシのプラットフォームを提供することです。 Kafka は Kafka Connect を通じて外部システムに接続でき、Kafka Streams を提供します。 「カフカの特徴」Kafka は、次の主な機能を備えた、分散型のパブリッシュ/サブスクライブ ベースのメッセージング システムです。 バージョン番号「Kafka バージョンの命名」公式サイトから Kafka をダウンロードすると、次のバージョンが表示されます。 前述のバージョン番号は、Kafka ソース コードをコンパイルするために使用される Scala コンパイラのバージョンです。 Kafka のサーバー側コードはすべて Scala で記述されており、オブジェクト指向プログラミングと関数型プログラミングの両方をサポートしています。 Scala で書かれたソースコードもコンパイル後は普通の .class ファイルになるので、Scala は JVM 言語であると言えます。 実際の Kafka のバージョン番号は 2.1.1 です。
最初の 2 はメジャー バージョン番号、つまり Major Version を表します。中央の 1 はマイナー バージョン番号またはセカンダリ バージョン番号、つまりマイナー バージョンを表します。最後の 1 はリビジョン番号、つまりパッチ番号を表します。 バージョン 1.0.0 のリリース後、Kafka コミュニティは、Kafka バージョンの命名規則が正式に 4 桁から 3 桁に進化したことを発表する記事を書きました。たとえば、バージョン 0.11.0.0 は 4 桁のバージョン番号です。 どちらのバージョンを使用する場合でも、サーバー バージョンとクライアント バージョンの一貫性を保つようにしてください。そうしないと、Kafka が提供するパフォーマンス最適化のメリットの多くを失うことになります。 「バージョン進化」バージョン 0.7: 最も基本的なメッセージ キュー機能のみを提供します。 バージョン 0.8: レプリケーション メカニズムが導入され、Kafka は完全に完成した分散型の信頼性の高いメッセージ キュー ソリューションになりました。 バージョン 0.9.0.0: 基本的なセキュリティ認証/承認機能を追加しました。 Java を使用してコンシューマ API を書き直しました。 Kafka Connect コンポーネントを導入しました。 バージョン 0.11.0.0: べき等プロデューサー API とトランザクション API を提供します。 Kafka メッセージ形式をリファクタリングします。 バージョン1.0と2.0: 主にKafka Streamsの改善 基本概念"テーマ"公開および購読の対象はトピックです。各ビジネス、各アプリケーション、さらには各データの種類ごとに専用のトピックを作成できます。 「生産者と消費者」トピックにメッセージを公開するクライアント アプリケーションはプロデューサーと呼ばれます。プロデューサー プログラムは通常、1 つ以上のトピックにメッセージを継続的に送信します。 これらのトピック メッセージをサブスクライブするクライアント アプリケーションは、コンシューマーと呼ばれます。コンシューマーは複数のトピックからのメッセージを同時にサブスクライブすることもできます。 ブローカクラスターは複数のブローカーで構成され、クライアントからのリクエストを受信して処理し、メッセージを保持する役割を担います。 複数の Broker プロセスを同じマシン上で実行することもできますが、異なる Broker プロセスを異なるマシン上で実行することが一般的です。このように、クラスター内のマシンがダウンした場合、そのマシン上で実行されているすべての Broker プロセスがダウンしても、他のマシン上の Broker プロセスは外部にサービスを提供し続けることができます。 「バックアップ機構」バックアップの考え方は非常にシンプルで、同じデータを複数のマシンにコピーすることであり、同じデータのこれらのコピーはレプリカと呼ばれます。 2種類のレプリカが定義されています: リーダーレプリカとフォロワーレプリカ 前者は外部の世界にサービスを提供し、クライアント プログラムと対話することを指します。一方、後者はリーダーレプリカに受動的に従うだけで、外界と対話することはできません。 "パーティション"パーティション化メカニズムとは、各トピックを複数のパーティションに分割することを指し、各パーティションは順序付けられたメッセージログのセットです。 プロデューサーによって生成された各メッセージは、1 つのパーティションにのみ送信されます。つまり、メッセージがデュアルパーティション トピックに送信された場合、メッセージはパーティション 0 またはパーティション 1 のいずれかに格納されます。 各パーティションの下に複数のレプリカを構成できますが、リーダーレプリカは1つだけ、フォロワーレプリカはN-1個までです。 プロデューサーはパーティションにメッセージを書き込みます。パーティション内の各メッセージの位置は変位と呼ばれます。 消費者団体複数のコンシューマーインスタンスがグループを形成してトピックのグループを消費する このトピック グループ内の各パーティションは、グループ内の 1 つのコンシューマー インスタンスによってのみ消費され、他のコンシューマー インスタンスはそれを消費できません。 同時に、従来のメッセージ エンジン システムの 2 つの主要モデルが実装されています。すべてのインスタンスが同じグループに属している場合は、メッセージ キュー モデルを実装します。 すべてのインスタンスが異なるグループに属している場合は、パブリッシュ/サブスクライブ モデルを実装します。 「コーディネーター:コーディネーター」いわゆるコーディネーターは、消費者グループに特化しており、グループのリバランスの実行、移転管理およびグループメンバー管理の提供を担当します。 具体的には、コンシューマー アプリケーションが変位を送信すると、実際にはコーディネーターが配置されているブローカーに変位が送信されます。同様に、コンシューマー アプリケーションが起動すると、コーディネーターが配置されているブローカーにさまざまな要求も送信されます。コーディネーターは、コンシューマー グループの登録やメンバー管理レコードなどのメタデータ管理操作を実行する責任を負います。 すべてのブローカーは、起動時に対応するコーディネーター コンポーネントを作成して起動します。 言い換えれば、「すべてのブローカーには独自のコーディネーター コンポーネントがあります。」 では、コンシューマー グループは、サービスを提供するコーディネーターがどのブローカーに属しているかをどのように判断するのでしょうか? Kafka 内部トピック __consumer_offsets を通じて。 現在、コンシューマー グループのコーディネーターが配置されているブローカーを決定するための Kafka のアルゴリズムには 2 つのステップがあります。
まず、Kafka はグループの group.id パラメータのハッシュ値を計算します。 たとえば、group.id が test-group に設定されているグループがある場合、その hashCode 値は 627841412 になります。 次に、Kafka は __consumer_offsets のパーティション数を計算します。これは通常 50 パーティションです。次に、ハッシュ値の係数を取り、絶対値、つまり abs(627841412 % 50) = 12 を計算します。 この時点で、__consumer_offsets トピックのパーティション 12 がこのグループのデータを保存する役割を担っていることがわかります。 パーティション番号を使用すると、__consumer_offsets トピックのパーティション 12 のリーダー コピーをどのブローカーが持っているかを確認するだけで済みます。このブローカーは私たちが探しているコーディネーターです。 消費者オフセット消費者の消費は進歩しており、各消費者は独自の消費者置換を持っています。 「リバランス:リバランス」コンシューマー グループ内のコンシューマー インスタンスがクラッシュすると、他のコンシューマー インスタンスが自動的にサブスクリプション トピック パーティションを再割り当てします。 再バランスは、Kafka コンシューマーが高可用性を実現するための重要な手段です。 「AR (割り当てられたレプリカ)」 : パーティション内のすべてのレプリカは、総称して AR と呼ばれます。 すべてのメッセージは最初にリーダー レプリカに送信され、その後フォロワー レプリカは同期のためにリーダーからメッセージをプルできます。 ただし、同期期間中、フォロワーはリーダーよりある程度遅れをとり、この時点ではフォロワーとリーダーは完全に同期しているわけではありません。 「OSR (Out Sync Replicas)」 : フォロワーレプリカがリーダーレプリカと完全に同期していない、または遅れているレプリカのセット 「ISR (In Sync Replicas)は AR のサブセットです。ISR 内のすべてのレプリカは、リーダーと完全に同期されたレプリカです。」 ISR 内のフォロワー レプリカがリーダー レプリカより大幅に遅れている場合は、ISR から削除されます。それ以外の場合、完全に同期されると、OSR から ISR セットに移動されます。 デフォルトでは、リーダー レプリカが失敗すると、ISR セット内のフォロワー レプリカのみが新しいリーダーとして選出される資格があり、OSR 内のレプリカには選出されるチャンスはありません (unclean.leader.election.enable で設定可能)。 「HW (高水準点) 」: 特定のメッセージ オフセットを識別する高水準点。コンシューマーは、このウォーターマーク オフセットの前にのみメッセージをプルできます。 次の図はログ ファイルを示しています。このログ ファイルには 9 つのメッセージしかありません。最初のメッセージのオフセット (LogStartOffset) は 0 で、最後のメッセージのオフセットは 8 です。オフセット 9 のメッセージは点線で表され、次に書き込まれるメッセージを表します。 ログ ファイルの HW は 6 です。つまり、コンシューマーはオフセットが 0 から 5 までのメッセージのみを取得できます。オフセットが 6 のメッセージはコンシューマーには表示されません。 「LEO (ログ終了オフセット)」 : 現在のログファイルに書き込まれる次のメッセージのオフセットを識別します。 上図では、オフセット 9 が現在のログ ファイルの LEO です。 LEO のサイズは、現在のログ パーティション内の最後のメッセージのオフセット値に 1 を加えた値になります。 パーティション ISR セット内の各レプリカは独自の LEO を維持し、ISR セット内の最小の LEO はパーティションの HW です。コンシューマーは HW の前でのみメッセージを消費できます。 システムアーキテクチャ「カフカの設計哲学」最も基本的なアーキテクチャは、プロデューサーが Kafka のトピックにメッセージを公開し、トピックのメッセージがブローカーに保存されるというものです。コンシューマーはトピックをサブスクライブし、ブローカーからのメッセージを消費します。次の図は、このシナリオをより直感的に説明しています。 「メッセージのステータス: 」 Kafka では、メッセージが消費されたかどうかのステータスが Consumer に保存されます。ブローカーは、メッセージが消費されるかどうか、また誰によって消費されるかについては気にしません。コンシューマーはオフセット値(パーティション内で消費される次のメッセージの位置を指す)を記録します。オフセットが正しく設定されていない場合、同じメッセージが複数回消費されたり、メッセージが失われたりする可能性があります。 「メッセージの永続性:」 Kafka は、非常に高いパフォーマンスでメッセージをローカル ファイル システムに永続化します。 「バッチ送信:」 Kafka は効率を向上するために、メッセージコレクション単位でのバッチ送信をサポートしています。 「プッシュ アンド プル: 」 Kafka のプロデューサーとコンシューマーはプッシュ アンド プル モードを採用しています。つまり、プロデューサーはブローカーにメッセージをプッシュし、コンシューマーはブローカーからメッセージをプルします。 「パーティション メカニズム (Partition):」 Kafka のブローカー側はメッセージのパーティション分割をサポートします。プロデューサーは、どのパーティションにメッセージを送信するかを決定できます。パーティション内のメッセージの順序は、プロデューサーがメッセージを送信する順序です。トピック内のパーティションの数は構成可能です。パーティションは、Kafka の高スループットを保証する上で重要です。 「システムアーキテクチャ」通常、Kafka アーキテクチャには、「複数のプロデューサー」、「複数のコンシューマー」、「複数のブローカー」、および「Zookeeper クラスター」が含まれます。 「プロデューサー」 : Kafka にメッセージを送信するプロデューサー。 「コンシューマー」 : 消費のために Kafka からメッセージをプルする役割を担うコンシューマー。 「ブローカー」 : Kafka サービス ノード。1 つ以上のブローカーが Kafka クラスターを形成します。 「Zookeeper クラスター」 : Kafka クラスターのメタデータやコントローラーの選択などの管理を担当します。 プロデューサーパーティションなぜパーティションするのですか?Kafka のメッセージ構成は、実際にはトピック、パーティション、メッセージの 3 レベルの構造です。 トピックの下にある各メッセージは 1 つのパーティションにのみ保存され、複数のパーティションに保存されることはありません。 実際、パーティショニングの役割は負荷分散機能を提供することであり、データをパーティショニングする主な理由はシステムの高いスケーラビリティを実現することです。 異なるパーティションを異なるノードのマシンに配置でき、データの読み取りおよび書き込み操作はパーティションの粒度で実行されるため、各ノード マシンは独自のパーティションの読み取りおよび書き込み要求処理を独立して実行できます。さらに、新しいノードマシンを追加することで、システム全体のスループットを向上させることもできます。 パーティション分割戦略は何ですか?「パーティショニング戦略とは、プロデューサーがどのパーティションにメッセージを送信するかを決定するアルゴリズムです。」 Kafka はデフォルトのパーティション分割戦略を提供し、パーティション分割戦略をカスタマイズすることもサポートしています。 「カスタムパーティション戦略」パーティション分割戦略をカスタマイズする場合は、プロデューサー側のパラメータpartitioner.classを明示的に構成する必要があります。 プロデューサー プログラムを作成するときに、org.apache.kafka.clients.producer.Partitioner インターフェースを実装する具体的なクラスを作成できます。 このインターフェースも非常にシンプルで、partition() と close() の 2 つのメソッドのみを定義します。通常は、最も重要なパーティション方法のみを実装する必要があります。 このメソッドのメソッド シグネチャを見てみましょう。
ここで、topic、key、keyBytes、value、valueBytes はすべてメッセージ データに属し、cluster はクラスター情報 (たとえば、現在の Kafka クラスター内にあるトピックとブローカーの数) を指します。 Kafka は、メッセージを分割し、どのパーティションに送信するかを計算できるように、多くの情報を提供します。 独自の実装クラスでパーティション メソッドを定義し、partitioner.class パラメータを独自の実装クラスの完全修飾名に設定している限り、プロデューサー プログラムはコード ロジックに従ってメッセージをパーティション分割します。 世論調査戦略ラウンドロビン戦略、つまり順次割り当てとも呼ばれます。 たとえば、トピックの下に 3 つのパーティションがある場合、最初のメッセージはパーティション 0 に送信され、2 番目のメッセージはパーティション 1 に送信され、3 番目のメッセージはパーティション 2 に送信されます。 4 番目のメッセージが生成されると、メッセージが再び開始され、パーティション 0 に割り当てられます。 これはいわゆるラウンドロビン戦略です。ラウンドロビン戦略は、Kafka Java プロデューサー API によって提供されるデフォルトのパーティション分割戦略です。 「ポーリング戦略は、優れた負荷分散パフォーマンスを備えています。常に、メッセージが可能な限りすべてのパーティションに均等に分散されることを保証できます。したがって、これはデフォルトで最も合理的なパーティション戦略であり、最もよく使用されるパーティション戦略の 1 つです。」 ランダム戦略ランダムネス戦略とも呼ばれます。いわゆるランダム性とは、メッセージを任意のパーティションにランダムに配置することを意味します。 パーティション メソッドのランダム戦略バージョンを実装する場合、非常に簡単で、必要なコードは 2 行だけです。
まずトピックのパーティションの合計数を計算し、それより小さい正の整数をランダムに返します。 本質的にはランダム戦略も各パーティションにデータを均等に分散することを目指しますが、実際のパフォーマンスではポーリング戦略よりも劣るため、「データの均等分散を追求する場合はポーリング戦略を使用する方がよい」とのことです。実際、ランダム戦略はプロデューサーの古いバージョンで使用されていたパーティショニング戦略であり、新しいバージョンではポーリングに変更されています。 「メッセージキー順序保存戦略」Kafka では、各メッセージに対して、キーと呼ばれるメッセージ キーを定義できます。 このキーは非常に重要な役割を果たします。顧客コード、部門番号、ビジネス ID など、明確なビジネス上の意味を持つ文字列にすることができます。メッセージのメタデータを表すためにも使用できます。 特に、Kafka がタイムスタンプをサポートしていなかった時代には、エンジニアがメッセージの作成時刻をキーに直接カプセル化していたシナリオもありました。 メッセージにキーが設定されると、同じキーを持つすべてのメッセージが同じパーティションに送信されるようになります。各パーティション内のメッセージは順番に処理されるため、この戦略はメッセージ キー順序付け戦略と呼ばれます。 この戦略を実装するパーティション メソッドもシンプルで、次の 2 行のコードのみが必要です。
上記の Kafka のデフォルトのパーティショニング戦略は、実際には 2 つの戦略を同時に実装します。キーが指定されている場合、デフォルトの実装はメッセージ キーの順序保持戦略です。キーが指定されていない場合は、ポーリング戦略が使用されます。 その他のパーティション戦略実際、もう 1 つ一般的な戦略があります。それは、いわゆる地理的位置に基づくパーティショニング戦略です。 もちろん、この戦略は一般に、大規模な Kafka クラスター、特に都市、国、さらには大陸にまたがるクラスターにのみ適用できます。 ブローカーの IP アドレスに基づいてカスタマイズされたパーティショニング戦略を実装できます。たとえば、次のコード:
すべてのパーティションから、リーダー コピーが南にあるすべてのパーティションを見つけ、その中からランダムに 1 つを選択してメッセージを送信できます。 プロデューサー圧縮アルゴリズム「Kafka はどのようにしてメッセージを圧縮するのですか?」現在、Kafka にはコミュニティで V1 と V2 と呼ばれる 2 つの主要なメッセージ形式があります。 バージョン V2 は Kafka 0.11.0.0 で正式に導入されました。 バージョンに関係なく、Kafka のメッセージ階層は、メッセージ コレクションとメッセージの 2 つのレイヤーに分かれています。 メッセージ セットには複数のログ項目が含まれており、ログ項目はメッセージが実際にカプセル化される場所です。 Kafka の基盤となるメッセージ ログは、一連のメッセージ コレクション ログ項目で構成されます。 Kafka は通常、個々のメッセージを直接操作せず、常にメッセージ コレクションのレベルで書き込みます。 「それでは、コミュニティがバージョン 2 を導入する目的は何でしょうか?」バージョン V2 では、主にメッセージの共通部分を抽出して外部メッセージ コレクションに配置するなど、バージョン V1 のいくつかの欠点を修正し、この情報を各メッセージに保存する必要がないようにしています。 たとえば、バージョン V1 では、各メッセージで CRC チェックを実行する必要がありますが、場合によってはメッセージの CRC 値が変更されます。 たとえば、メッセージのタイムスタンプ フィールドはブローカー側で更新される可能性があり、再計算された CRC 値もそれに応じて更新されます。たとえば、ブローカー側がメッセージ形式の変換を実行すると (主にクライアント プログラムの古いバージョンとの互換性のため)、CRC 値も変更されます。 このような状況を考慮すると、各メッセージに対して CRC 検証を実行する必要はなく、スペースを浪費するだけでなく、CPU 時間も遅延します。そのため、V2 バージョンでは、メッセージの CRC 検証がメッセージ収集層に移動されます。 圧縮に密接に関連するバージョン V2 のもう 1 つの改善点は、圧縮されたメッセージを保存する方法が変更されたことです。 バージョン V1 で圧縮されたメッセージを保存する以前の方法は、複数のメッセージを圧縮してから、外部メッセージのメッセージ本文フィールドに保存することでした。一方、バージョン V2 の方法は、メッセージ セット全体を圧縮することです。明らかに、後者は前者よりも圧縮効果が優れているはずです。 いつ圧縮しますか?Kafka では、圧縮はプロデューサー側とブローカー側の 2 つの場所で発生します。 プロデューサー プログラムで compression.type パラメータを構成すると、指定されたタイプの圧縮アルゴリズムが有効になります。 たとえば、次のコードは、GZIP を有効にして Producer オブジェクトを構築する方法を示しています。
ここで重要なコード行は props.put("compression.type", "gzip") であり、これはこのプロデューサーが使用する圧縮アルゴリズムが GZIP であることを示しています。 このように、プロデューサーの起動後に生成される各メッセージ セットは GZIP で圧縮されるため、Kafka ブローカー側のネットワーク転送帯域幅とディスク使用量を効果的に節約できます。 ブローカーがメッセージを再圧縮する可能性がある例外が 2 つあります。 「ケース 1: ブローカーがプロデューサーとは異なる圧縮アルゴリズムを指定します。」ブローカー側で異なる compression.type 値を設定すると、予期しない圧縮/解凍操作が発生する可能性があり、通常、ブローカー側の CPU 使用率の急増として現れるため、注意が必要です。 「ケース 2: ブローカー側でメッセージ形式の変換が行われます。」いわゆるメッセージ形式の変換は、主にコンシューマー プログラムの古いバージョンとの互換性のためです。 実稼働環境では、複数のバージョンのメッセージ形式を Kafka クラスターに同時に保存することが一般的です。 古いバージョンの形式との互換性を保つために、ブローカーは新しいバージョンのメッセージを古いバージョンの形式に変換します。 このプロセスには、メッセージの解凍と再圧縮が含まれます。 一般的に、このメッセージ形式の変換はパフォーマンスに大きな影響を与えます。ここでの圧縮に加えて、Kafka のゼロ コピー機能も失われます。 いつ解凍されますか?圧縮があれば、必ず減圧も必要です!一般的に、圧縮解除はコンシューマー プログラムで行われます。つまり、プロデューサーが圧縮されたメッセージをブローカーに送信した後、ブローカーはそれを受信してそのまま保存します。コンシューマー プログラムがメッセージのこの部分を要求した場合、ブローカーはそれをそのまま送信します。メッセージがコンシューマーに到達すると、コンシューマーはメッセージを解凍し、以前のメッセージに復元します。 「基本的なプロセス: プロデューサー側で圧縮、ブローカー側で保持、コンシューマー側で解凍。」注意: コンシューマー側での解凍に加えて、ブローカー側でも解凍が実行されます。 各圧縮メッセージ セットは、メッセージに対してさまざまな検証を実行するために、ブローカー側で書き込まれるときに解凍する必要があります。 この解凍は、特に CPU 使用率の点で、ブローカー側のパフォーマンスに一定の影響を与えることを認めなければなりません。 「各種圧縮アルゴリズムの比較」Kafka 2.1.0 より前では、Kafka は GZIP、Snappy、LZ4 の 3 つの圧縮アルゴリズムをサポートしていました。 2.1.0 以降、Kafka は Zstandard アルゴリズム (略称 zstd) を正式にサポートしています。 これは Facebook がオープンソース化した、超高圧縮率を実現できる圧縮アルゴリズムです。 実際の使用においては、GZIP、Snappy、LZ4、zstd にはそれぞれ独自の利点があります。 しかし、Kafka の場合、スループットの点では、LZ4 > Snappy > zstd および GZIP となります。圧縮率に関しては、zstd > LZ4 > GZIP > Snappy となります。 特に物理リソースの場合、Snappy アルゴリズムはネットワーク帯域幅を最も多く占有しますが、zstd は最も少なく占有します。 CPU 使用率の点では、アルゴリズムのパフォーマンスは同様ですが、Snappy アルゴリズムは圧縮時に CPU をより多く使用し、GZIP アルゴリズムは解凍時に CPU をより多く使用する可能性があります。 ベストプラクティス圧縮を有効にする適切なタイミングはいつですか? 圧縮を有効にするための条件の 1 つは、Producer プログラムが実行されているマシン上の CPU リソースが十分である必要があることです。 十分な CPU リソースの条件に加えて、環境の帯域幅リソースが限られている場合は、圧縮を有効にすることをお勧めします。 消費者団体「コンシューマー グループは、Kafka が提供するスケーラブルでフォールト トレラントなコンシューマー メカニズムです。」グループであるため、グループ内には複数のコンシューマーまたはコンシューマー インスタンスが存在する必要があり、それらはグループ ID と呼ばれる共通の ID を共有します。 グループ内のすべてのコンシューマーは、サブスクライブされたトピックのすべてのパーティションを消費するように調整します。
「消費者グループの3つの特徴」
コンシューマー グループが複数のトピックをサブスクライブする場合、グループ内の各インスタンスはトピックのすべてのパーティションをサブスクライブする必要はありません。一部のパーティションからのメッセージのみを消費します。 コンシューマー グループは互いに独立しており、相互に影響を与えません。互いに干渉することなく、同じトピック セットをサブスクライブできます。 「Kafka は Consumer Group メカニズムのみを使用しますが、従来のメッセージ エンジン システムの 2 つの主要モデルを同時に実装します」:すべてのインスタンスが同じグループに属している場合は、メッセージ キュー モデルを実装します。 すべてのインスタンスが異なるグループに属している場合は、パブリッシュ/サブスクライブ モデルを実装します。 「グループにはコンシューマー インスタンスがいくつ必要ですか?」 「理想的には、コンシューマー インスタンスの数は、グループがサブスクライブするパーティションの合計数と等しくする必要があります。」 コンシューマー グループが 3 つのトピック (A、B、C) をサブスクライブし、そのパーティション番号が 1、2、3 であると仮定すると、スケーラビリティを最大化できるため、通常はグループに 6 つのコンシューマー インスタンスを設定するのが理想的です。 「消費者グループにとって、Kafka はどのようにして置き換えを管理するのでしょうか?」"オフセット"古いバージョンの Consumer Group は、ZooKeeper に変位を保存します。 Apache ZooKeeper は、さまざまな調整管理を実装するために Kafka が大きく依存している分散調整サービス フレームワークです。 ZooKeeper 外部のシステムに変位を保存する最も明らかな利点は、Kafka Broker 側の状態保存のオーバーヘッドが削減されることです。 しかし、徐々に問題が見つかりました。ZooKeeper などのメタフレームワークは頻繁な書き込み更新には適していないのに対し、コンシューマー グループの変位更新は非常に頻繁な操作です。 この書き込み操作のスループットが高いと、ZooKeeper クラスターのパフォーマンスが大幅に低下する可能性があります。 そのため、新しいバージョンの Consumer Group では、Kafka コミュニティは Consumer Group の変位管理方法を再設計し、変位を Kafka の内部トピックに格納する方法を採用しました。 この内部トピックは __consumer_offsets です。 消費者戦略「最初は丸い」デフォルトでは、ラウンドロビンとも呼ばれ、同じコンシューマー グループに対して、コンシューマーが消費するパーティションを決定するためにラウンドロビン割り当て方法が使用されます。 「2番目のタイプはレンジと呼ばれます」コンシューマー グループの場合、消費方法はパーティションの合計数をコンシューマーの合計数で割ることによって決定されます。一般的に、均等に分割できない場合は、残りのパーティションを最初から割り当てることが多いです。 「3番目のタイプはスティッキーと呼ばれます」0.11.x で新しく追加されました。前の 2 つとはあまり似ていません。レンジでの昇華です。新しい消費者が同じグループに参加したり、古い消費者が同じグループから脱退したりすると、前の 2 つの消費者が消費者の消費モードを最初から決定し始めます。ただし、新しいコンシューマーが同じグループに参加したり、古いコンシューマーが同じグループから離脱したりしても、Sticky は新しい範囲の割り当てを直接開始しません。代わりに、既存の消費者の元の消費戦略を維持し、退出する消費者が消費したパーティションを既存の消費者に均等に分配します。同じことは、他の既存消費者の消費戦略から切り離された新規消費者にも当てはまります。 変位提出パーティションに 10 個のメッセージがあり、変位がそれぞれ 0 から 9 であると仮定します。 Consumer アプリケーションは 5 つのメッセージを消費しました。これは、Consumer が 0 から 4 までの変位を持つ 5 つのメッセージを消費したことを意味します。この時点で、Consumer の変位は 5 であり、次のメッセージの変位を指しています。 コンシューマーは複数のパーティションから同時にデータを消費できるため、変位の送信は実際にはパーティションの粒度で実行されます。つまり、「コンシューマーは、割り当てられたパーティションごとに独自の変位データを送信する必要があります。」 「変位送信は自動送信と手動送信に分かれており、消費者の観点から見ると、変位送信は同期送信と非同期送信に分かれています。」 変位の自動コミットを有効にする方法: コンシューマー側に enable.auto.commit パラメーターがあります。これを true に設定するか、まったく設定しないでください。 デフォルト値は true であるため、Java Consumer はデフォルトで自動的に変位を送信します。 自動コミットが有効になっている場合は、コンシューマー側にもパラメータ auto.commit.interval.ms があります。 デフォルト値は 5 秒です。つまり、Kafka は 5 秒ごとにシフトを自動的にコミットします。
上記のコードの 3 行目と 4 行目は、変位の自動送信を有効にする方法です。 変位の手動コミットを有効にするには、enable.auto.commit を false に設定します。 変位を手動で送信するには、対応する API を呼び出す必要もあります。最も単純な API は「KafkaConsumer#commitSync()」です。 このメソッドは、KafkaConsumer#poll() によって返された最新の変位をコミットします。 名前が示すように、これは同期操作です。つまり、メソッドは、変位が正常に送信されるまで待機してから戻ります。 送信プロセス中に例外が発生した場合、このメソッドは例外情報をスローします。 次のコード スニペットは、commitSync() の使用方法を示しています。
enable.auto.commit が true に設定されると、Kafka は、poll メソッドが呼び出されたときに、最後のポーリングによって返されたすべてのメッセージがコミットされるようにします。 順序に関して言えば、ポーリング方式のロジックは、まず前のメッセージ バッチの置き換えを送信し、次に次のメッセージ バッチを処理することで、消費損失がないことを保証します。 しかし、自動的に置換を行うことの問題点の1つは、 「重複消費につながる恐れがあります。」変位を手動で送信する利点は、より柔軟性が高く、変位の送信のタイミングと頻度を完全に制御できることです。 ただし、これには欠陥もあります。つまり、commitSync() が呼び出されると、リモート ブローカーが送信結果を返すまでコンシューマー プログラムはブロックされた状態になり、この状態は終了します。 この問題を考慮して、Kafkaコミュニティは、移動を手動で送信するための別のAPIメソッドを提供します。 「kafkaconsumer#commitasync()」。名前から、それは同期ではなく、非同期操作です。 commitasync()を呼び出した後、ブロックせずにすぐに戻り、消費者アプリケーションのTPSに影響しません。 非同期であるため、Kafkaは、ロギングや処理の例外など、サブミット後のロジックを実装するためのコールバック関数を提供します。 次のコードスニペットは、commitasync()を呼び出す方法を示しています。
commitasyncの問題は、何かがうまくいかないときに自動的に再試行しないことです。 明らかに、それが手動での提出である場合、2つの理由により、最良の効果を達成するためにコミットティンとコミットメントを組み合わせる必要があります。
次のコードを見てみましょう。これは、手動提出のために2つのAPIメソッドを組み合わせる方法を示しています。
このようなシナリオ:ポーリング方法は500メッセージではなく、5000メッセージを返します。 まあ、あなたは間違いなくシフトを送信する前に5,000のメッセージすべてを処理したくありません。なぜなら、中央にエラーがある場合、以前のすべての処理を再度実行する必要があるからです。 たとえば、5,000のメッセージの前の例では、大量のメッセージを再消費することを避けるために、100個のメッセージを処理した後に変位をコミットすることをお勧めします。 Kafka Consumer APIは、手動提出のためのこのような方法を提供します:commitsync(Map)およびCommitAsync(MAP)。 それらのパラメーターはマップオブジェクトであり、キーはトピックパルティション、つまり消費されるパーティションであり、値は主に変位データを保存するオフセットAndmetadataオブジェクトです。
ここでは、Commitasyncを例として、コードを表示する例として取ります。実際、Commitsyncの呼び出し方法はそれとまったく同じです。
このプログラムは、最初に消費者消費プロセス中に送信されるパーティション変位を保存するMAPオブジェクトを作成し、次にメッセージを1つずつ処理し、送信する変位値を構築します。 コードの最後の部分は、変位の提出を行うことです。カウンターが設定され、100のメッセージが蓄積されるたびに変位が均一に提出されます。 パラメーターのないcommitasyncを呼び出すのとは異なり、MAPオブジェクトパラメーターを使用したcommitAsyncがここで呼び出され、細粒の変位提出を実行します。 このようにして、このコードは、投票方法によって返されるメッセージの総数によって制限されることなく、処理された100のメッセージごとに変位を送信できます。 リバランス「(リバランス)リバランスは、基本的に、消費者グループの下のすべての消費者が、購読されたトピックの各パーティションを割り当てるコンセンサスに到達する方法を指定するプロトコルです。」たとえば、グループの下には20の消費者インスタンスがあり、100のパーティションを持つトピックを購読しています。 通常の状況では、Kafkaは各消費者に平均5つのパーティションを割り当てます。この分布プロセスはリバランスと呼ばれます。 「リバランスのための3つのトリガー条件があります。」
リバランスが発生すると、グループの下のすべての消費者インスタンスが一緒に調整して参加します。 割り当て戦略現在、Kafkaはデフォルトで3つの割り当て戦略を提供していますが、それぞれには特定の利点と短所があります。コミュニティは、これらの戦略を継続的に改善して、最も公平な割り当て戦略が提供されるようにします。つまり、各消費者インスタンスは比較的平均的なパーティション数を取得できます。 たとえば、グループに10の消費者インスタンスがある場合、100のパーティションが消費されます。理想的な割り当て戦略は、当然、インスタンスごとに平均10のパーティションを取得することです。 これは公正な分配戦略と呼ばれます。 消費者グループのリバランスのプロセスを説明するための簡単な例を教えてください。 AとBなどの2つの消費者グループがあるとします。3番目のメンバーCが参加すると、カフカはデフォルトの割り当て戦略に従ってリバランスをトリガーし、A、B、Cにパーティションを再割り当てします。 リバランス後の配分はまだ公平です。つまり、各消費者インスタンスは2つのパーティションの消費権を取得します。 リバランスプロセス中、すべての消費者インスタンスは消費を停止し、リバランスが完了するのを待ちます。これは、人々によって批判されるリバランスの側面です。 現在、リバランスはすべての消費者インスタンスに参加するように設計されており、すべてのパーティションが再割り当てされています。 「コーディネーターは、どのような状況下で消費者のインスタンスが吊り下げられていると考えているので、グループから撤退する必要があると考えていますか?」消費者グループがリバランスを完了した後、各消費者インスタンスは定期的にハートビートリクエストをコーディネーターに送信し、それがまだ生きていることを示します。 消費者インスタンスがこれらのハートビートリクエストを時間内に送信できない場合、コーディネーターは消費者が死んでいると考え、グループから削除し、新しいリバランスを開始します。 session.timeout.msと呼ばれる消費者側にパラメーターがあります。 このパラメーターのデフォルト値は10秒です。つまり、コーディネーターが10秒以内にグループの下で消費者インスタンスのハートビートを受け取らない場合、消費者インスタンスがハングしたと考えるでしょう。 このパラメーターに加えて、Consumerは、HeartBeat.Interval.msであるHeartBeatリクエストを送信する頻度を制御できるパラメーターも提供します。 この値が小さいほど、消費者インスタンスの頻度が高くなります。 頻繁にハートビートリクエストを送信すると、追加の帯域幅リソースが消費されますが、各消費者インスタンスにリバランスを有効にするために通知する現在のコーディネーターの方法は、リバランス_に必要なフラグを心拍リクエストの応答本体にカプセル化することであるため、リバランスが現在有効になっているかどうかをより迅速に知ることができるという利点があります。 上記の2つのパラメーターに加えて、消費者側にはパラメーターもあります。これは、消費者の実際の消費能力がリバランスに与える影響を制御するために使用されます。 消費者側のアプリケーションがポーリング方法を呼び出すときの2回の間の最大時間間隔を制限します。 デフォルト値は5分で、消費者プログラムが5分以内に投票方法によって返されたメッセージを消費できない場合、消費者はグループを離れるという要求を積極的に開始し、コーディネーターも新しいリバランスを開始します。 「リバランスを避けるための構成」最初のタイプのリバランスは、ハートビートを時間内に送信できなかったことによって引き起こされ、消費者がグループから追い出されました。 したがって、「session.timeout.msおよびheartbeat.interval.ms」の値を設定できます。
session.timeout.msから6sを設定することは、主にコーディネーターがすでに吊り下げられた消費者をより速く見つけることができるようにするためです。 「2番目のタイプのリバランスは、消費者が時間を長く費やすことによって引き起こされます。」消費者がこれらのメッセージを処理するのに時間がかかりすぎるため、消費者がリバランスを引き起こさないように、ビジネス処理ロジックに十分な時間を残す必要があります。 ConsumerOffsets「Kafkaは、消費者の変位データを通常のKafkaメッセージとして__Consumer_Offsetsに提出します。」「__CONSUMER_OFFSETSの主な機能は、Kafka消費者の変位情報を保存することです。」この提出プロセスは、耐久性が高いだけでなく、高周波書き込み操作をサポートする必要があります。 __CONSUMER_OFFSETSテーマは、通常のKafkaテーマです。手動で作成したり、変更したり、削除したりすることもできます。 __CONSUMER_OFFSETSテーマは通常のKafkaのテーマですが、「そのメッセージ形式はKafka自体によって定義されています」。ユーザーはそれを変更することはできません。つまり、このトピックへのメッセージを自由に記述することはできません。これは、書くメッセージがKafkaによって指定された形式を満たさないと、Kafkaが内部でそれを解析することができず、ブローカーがクラッシュするからです。 Kafka Consumerには、変位を送信するのに役立つAPIがあります。つまり、__CONSUMER_OFFSETSトピックにメッセージを書き込むことができます。プロデューサーを自分で書いて、トピックに自由にメッセージを送信しないでください。 __CONSUMER_OFFSETSには3つのメッセージ形式があります。
2番目の形式には、排他的な名前があります。Tombstoneメッセージ、Delete Markとも呼ばれます。その主な機能は、メッセージ本文がnull、つまり空のメッセージ本文です。 消費者グループの下のすべての消費者インスタンスが停止し、変位データが削除されると、Kafkaは__Consumer_Offsetsトピックの対応するパーティションにTombstoneメッセージを書き込み、グループの情報が完全に削除されていることを示します。 __consumer_offsetsはどのように作成されますか? 一般的に言えば、「Kafkaは、Kafkaクラスターの最初の消費者プログラムが開始されると、自動的に変位トピックを作成します。」 「デフォルトこのトピックのパーティションの数は50で、コピーの数は3です。」現在、Kafkaの消費者に変位を提出する方法は2つあります。「自動的に変位を送信し、手動で送信する変位を手動で送信しています。」 消費者側には、enable.auto.commitというパラメーターがあります。値が真である場合、消費者は静かにバックグラウンドで定期的に変位を提出します。提出間隔は、排他的なパラメーターauto.commit.interval.msによって制御されます。 自動的に変位を提出することには大きな利点があります。つまり、トラブルを節約します。メッセージの消費が失われないようにするために、変位の提出について心配する必要はありません。 しかし、これも不利であり、多くの柔軟性と制御性を失い、消費者側の変位管理をまったく制御することはできません。 Kafka Consumer APIは、Consumer.Commitsyncなど、変位の提出方法を提供します。 これらのメソッドが呼び出されると、Kafkaは__consumer_offsetsトピックに対応するメッセージを書き込みます。 変位を自動的に送信することを選択した場合、問題がある可能性があります。消費者が常に起動している限り、変位トピックへのメッセージを無期限に書き込みます。 「極端な例を挙げてください。」消費者が現在特定のトピックから最新のメッセージを消費しており、変位が100であると仮定します。その後、このトピックで新しいメッセージが生成されないため、消費者は消費するメッセージがないため、変位は常に100に維持されます。 変位が自動的に送信されるため、変位トピックは常に変位= 100でメッセージを書き込みます。 明らかに、Kafkaはこのタイプのメッセージの最新のものを保持するだけで、以前のメッセージを削除することができます。 これには、カフカが変位をテーマにしたメッセージの特性に関するメッセージ削除戦略を持つ必要があります。そうしないと、そのようなメッセージは最終的にディスク全体を破裂させます。 「コンパクト戦略」Kafkaは「コンパクト戦略」を使用して、__CONSUMER_OFFSETSトピックで期限切れのメッセージを削除して、トピックが無期限に膨張しないようにします。 たとえば、同じキーの2つのメッセージM1とM2の場合、M1の送信時間がM2より早い場合、M1は期限切れのメッセージです。 コンパクトのプロセスは、ログ内のすべてのメッセージをスキャンし、これらの期限切れのメッセージを排除し、残りのメッセージを一緒に整理することです。 コンパクトプロセスを説明するために、公式Webサイトの写真をここに投稿します。 図に変位が0、2、3のメッセージのキーはK1です。コンパクト後、パーティションは最新の送信であるため、3の変位でメッセージを保存する必要があります。 「Kafkaは、Compactが条件を満たす削除データがあるかどうかを確認するのを待っているトピックを定期的に検査するための特別な背景スレッドを提供します。」この背景スレッドは、ログクリーナーと呼ばれます。 多くの実際の生産環境には、ディスクスペースが多すぎる移動テーマの無制限の拡張などの問題があります。環境にこの問題がある場合は、ログクリーナースレッドのステータスを確認することをお勧めします。通常、このスレッドは、このスレッドの失効によって引き起こされます。 コピーメカニズムKafka Replicaメカニズムの定義によれば、同じパーティションの下のすべてのレプリカは同じメッセージシーケンスを保持し、これらのレプリカは異なるブローカーに散らばっており、ブローカーのダウンタイムによって引き起こされるデータの利用不可能性と闘うことができます。 以下は、3人のブローカーを備えたKafkaクラスター上のレプリカの分布を示しています。 この写真から、トピック1パーティション0の3つのコピーが3つのブローカーに散らばっていることがわかり、他のトピックパーティションのコピーもさまざまなブローカーに散らばっているため、データ冗長性を達成します。 「コピーロール」Kafkaでは、レプリカはリーダーレプリカとフォロワーレプリカの2つのカテゴリに分かれています。 各パーティションは、リーダーコピーと呼ばれるコピーが作成されるとコピーが選出され、残りは自動的にフォロワーコピーと呼ばれます。 カフカでは、フォロワーのコピーは外の世界に提供されていません。これは、消費者や生産者からの読み取りおよび書き込みリクエストにフォロワーコピーが応答できないことを意味します。すべてのリクエストは、リーダーのコピーによって処理される必要があります。または、すべての読み取りおよび書き込みリクエストは、リーダーのコピーがあるブローカーに送信する必要があり、ブローカーは取り扱いを担当します。 フォロワーレプリカは、クライアントリクエストを処理しません。その唯一のタスクは、リーダーレプリカからメッセージを「プル」し、それを独自のコミットログに書き込み、それによりリーダーレプリカとの同期を実現することです。 リーダーのダンジョンが死んだとき、またはリーダーのダンジョンがダウンしているブローカーが死んだとき、カフカはZookeeperが提供する監視機能に頼ってリアルタイムでそれを感じ、すぐに新しいリーダーの選挙の新しいラウンドを開始し、新しいリーダーとしてフォロワーダンジョンの1つを選択できます。古いリーダーのコピーが再起動された後、フォロワーコピーとしてクラスターにのみ追加できます。 クライアントユーザーの場合、Kafkaのフォロワーコピーは効果がありません。なぜカフカはそれをこのように設計したのですか? このレプリカメカニズムには2つの利点があります。 1。「読み取りを実装してください」。名前が示すように、いわゆる読み取りごとに、プロデューサーAPIを使用してKafkaにメッセージを正常に書き込むと、すぐに消費者APIを使用して、作成したメッセージを読み取ります。 2。「単調な読み取りを実現するのに便利」。現在、リーダーのレプリカデータを非同期に引っ張る2つのフォロワーレプリカF1とF2があるとします。 F1がリーダーから最新のニュースを引き出し、F2がまだ時間内にそれを引いていない場合、消費者が最初にF1からメッセージを読み取り、次にF2からメッセージを引く場合、この現象を見るかもしれません。 ただし、すべての読み取りリクエストがリーダーによって処理された場合、Kafkaは単調な読み取りの一貫性を簡単に達成できます。 ISRメカニズムISRレプリカコレクションと呼ばれるインシンクレプリカ。 ISRのレプリカはすべて、リーダーと同期されています。それどころか、ISRにないフォロワーのレプリカは、リーダーと同期していないと見なされます。
リーダーのコピーは当然ISRにあります。言い換えれば、「ISRは単なるフォロワーレプリカのコレクションではなく、リーダーのレプリカを含める必要があります。場合によっては、ISRにはリーダーのレプリカのみがあります。」 さらに、ISRに入ることができるフォロワーコピーは、特定の条件を満たす必要があります。 「ブローカー側のパラメーターReplica.lag.time.max.msパラメーター値の合格」。このパラメーターの意味は、フォロワーコピーがリーダーコピーの後ろに遅れをとる場合の最大時間間隔、および現在のデフォルト値が10秒であることです。 つまり、フォロワーのコピーがリーダーのコピーに10秒以上遅れている限り、カフカはフォロワーコピーがリーダーと同期していると考えています。 フォロワーレプリカの唯一の仕事は、リーダーのレプリカから常にメッセージを引き出し、それを独自のコミットログに書き込むことです。 コピーがリーダーの進捗状況にゆっくりと追いつくと、ISRに追加することができます。 ISRは動的に調整されたコレクションであり、静的に変更されていません。 汚れたリーダーの選挙「カフカは、ISRにないすべての生き残ったコピーを同期コピーとして呼びます。」 一般的に言えば、非同期のレプリカはリーダーに遅れすぎているので、これらのレプリカを新しいリーダーとして選択すると、データが失われる可能性があります。 結局のところ、これらのレプリカに保存されたメッセージは、古いリーダーのレプリカにはるかに遅れています。 カフカでは、そのようなコピーを選出するプロセスは、不正な指導者選挙と呼ばれています。 「ブローカー側のパラメーターunclean.Leader.Election.Enable Control control control control constry constrean constry constrean constry constry constry constar汚れたリーダーの選挙をオンにすると、データの損失が引き起こされる可能性がありますが、利益は、外の世界へのサービスの提供を停止しないように、パーティション化されたリーダーのコピーが存在しないようにし、高可用性を向上させることです。それどころか、汚れたリーダーの選挙を禁止する利点は、データの一貫性を維持し、メッセージの損失を回避するが、高可用性を犠牲にしていることです。 選挙をコピーしますKafkaクラスターのトピックパーティションおよびレプリカリーダーの設定については、クラスターの全体的な負荷容量バランスを考慮する必要があります。各パーティションのレプリカリーダーは、さまざまなブローカーで可能な限り割り当てられます。これにより、同じブローカーの複数のリーダーが回避され、クラスター内の不均衡なブローカーが生まれます。 Kafkaは、優先レプリカの概念を紹介しました。優先レプリカは、AR(パーティション内のすべてのレプリカ)コレクションリストの最初のレプリカを意味します。理想的な状態では、レプリカはパーティションのリーダーレプリカです。 たとえば、Kafkaクラスターは3人のブローカーで構成され、トピックパート版と呼ばれるトピックを作成し、パーティションを3に設定し、レプリカの数は3、パーティション0のARリストは[1,2,0]、パーティション0の優先レプリカは1です。 Kafkaはマルチレプリカメカニズムを使用して信頼性を向上させますが、リーダーレプリカのみが外の世界に読み取りおよび書き込みサービスを提供し、フォローレプリカはメッセージの同期のみを行います。 「パーティションのリーダーレプリカが利用できない場合、パーティション全体が利用できないことを意味します。現時点では、サービスを提供するために、新しいリーダーのレプリカをフォロワーレプリカから選出する必要があります。」「テーマを作成するとき、パーティションのテーマとコピーは、カフカの各ブローカーに可能な限り均等に公開されます。」たとえば、3つのブローカーノードを含むKafkaクラスターに3のパーティションとレプリカ係数が3のトピックパート版を作成すると、リーダーのレプリカは3つのブローカーノードに均等に配布されます。 「同じパーティションでは、同じブローカーノードに複数のコピーが表示されません。」リーダーレプリカがパーティションのリーダーノードに配置されているノードと、フォロワーレプリカがフォロワーノードに配置されているノードを呼び出すことができます。 上記の例では、パーティション0のリーダーノードはbroker1、パーティション1のリーダーノードはbroker2、パーティション2のリーダーノードはbroker0です。 パーティションリーダーノードが失敗すると、フォロワーノードの1つが新しいリーダーノードとして選出されます。 元のリーダーのノードが復元されると、フォロワーノードになり、バランスの取れたクラスター負荷につながる可能性があります。 たとえば、パーティション1のリーダーノードBroker2がクラッシュし、Broker1のパーティション1フォロワーノードが新しいリーダーノードとして選出されます。 broker2が復元されると、kafkaクラスターステータスは次のとおりです。 現時点では、Broker1の負荷が大きく、Broker2には負荷がないことがわかります。 「上記の負荷の不均衡の状況を解決するために、Kafkaは優先レプリカ選挙をサポートしています。これは、パーティションが存在するARコレクションの最初のレプリカを指します。」たとえば、上記のパーティション1では、そのARセットは[2,0,1]であり、パーティション1の優先コピーがBroker2にあることを示しています。 理想的には、優先レプリカはリーダーのレプリカでなければなりません。 Kafkaは、優先レプリカのバランスの取れた分布を保証します。これは、ブローカーノードがダウンしているかどうかとは関係ありません。 「優先レプリカの選挙は、パーティションリーダーのレプリカを可能な限り選出するときに、優先度のレプリカをリーダーにすることです。」上記の状況に応じて、優先順位のレプリカ選挙を再びトリガーする限り、パーティションの負荷分散を確保できます。 Kafkaは、自動優先度のレプリカ選挙機能をサポートしており、優先レプリカの選挙運用はデフォルトで5分ごとにトリガーされます。 ネットワーク通信モデルブローカーには、新しい接続の到着を聴くためのアクセプター(メインリアクター)がいます。新しい接続との接続を確立した後、この接続を管理するためにプロセッサ(サブリークター)のポーリングと選択。 プロセッサは、管理する接続を聴きます。イベントが到着すると、読み取りはリクエストにカプセル化され、リクエストを共有リクエストキューに入れます。 次に、IOスレッドプールは、キューからリクエストを継続的に取得し、実際の処理を実行します。処理後、応答は対応するプロセッサの応答キューに送信され、プロセッサはクライアントへの応答を返します。 各リスナーには、1つのアクセプタースレッドのみがあります。これは、ロジックが多すぎず、非常に軽量である新しい接続として再配布され、再配布されるだけです。 プロセッサは、Kafkaのネットワークスレッドと呼ばれます。デフォルトのネットワークスレッドプールには3つのスレッドがあり、対応するパラメーターはnum.network.threadsであり、実際のビジネスダイナミクスに応じて増加および減少させることができます。 IOスレッドプール、つまり実際の処理を実行するKafkareQuestHandlerpoolもあり、対応するパラメーターはnum.io.threads、デフォルト値は8です。 IOスレッドが処理されると、応答は対応するプロセッサに配置され、プロセッサはクライアントへの応答を返します。 ネットワークスレッドとIOスレッド間で使用されるクラシックプロデューサー - 消費者モデルは、リクエストの処理に使用される共有要求キューであろうと、IO処理後に返された応答であるかどうかを確認できます。 iDepotency「iDempogenic Producer」Kafka では、プロデューサーはデフォルトではべき等ではありませんが、べき等なプロデューサーを作成できます。 実際、バージョン0.11.0.0によって導入された新機能です。その前に、Kafkaがパーティションにデータを送信したとき、同じメッセージが複数回送信された可能性があり、メッセージが複製されました。 0.11以降、生産者の等式を指定する方法は非常に簡単です。 1つのパラメーターを設定するだけで、つまり、
enable.idempotenceがtrueに設定された後、プロデューサーは自動的にidempotentプロデューサーにアップグレードし、他のすべてのコードロジックを変更する必要はありません。 Kafkaは、重複を削除するために繰り返しメッセージを作成するのに自動的に役立ちます。 基礎となる層の特定の原則は非常に単純です。これは、時間を交換するためにスペースを使用するという古典的な最適化のアイデアです。つまり、ブローカー側のより多くのフィールドを節約します。 プロデューサーが同じフィールド値でメッセージを送信すると、ブローカーはメッセージが繰り返されていることを自動的に知ることができ、バックグラウンドで静かに捨てることができます。 「IDEPOTENT生産者の範囲」まず第一に、単一のパーティションでのみを確保することができます。つまり、特定のトピックの1つのパーティションに複製メッセージがないことを保証でき、複数のパーティションのiDempotenceを実現できません。 第二に、単一セッションでのみべき等性を実現でき、セッション間では実現できません。 ここでは、プロデューサープロセスの実行としてセッションを理解できます。 Producer プロセスを再起動すると、このべき等性の保証は失われます。 取引Kafkaはまた、バージョン0.11以来のトランザクションのサポートを提供しており、現在、主に読み取られた隔離レベルで物事を行っています。 複数のメッセージがターゲットパーティションに原子的に記述されるようにすることができ、消費者がトランザクションによって正常に送信されたメッセージのみを表示できるようにすることもできます。 「トランザクションプロデューサー」トランザクションプロデューサーは、メッセージが原子的に複数のパーティションに記述されるようにすることができます。 このメッセージのバッチは、正常に記述されるか、すべて失敗します。さらに、トランザクションプロデューサーはプロセスの再開を恐れていません。 プロデューサーが再起動した後、Kafkaはまだメッセージを一度に正確に送信することを保証します。 トランザクションプロデューサーを設定する方法も非常に簡単であり、2つの要件を満たすことができます。
さらに、このコードに示すように、プロデューサーコードで調整する必要があります。
通常のプロデューサーコードと比較して、トランザクションプロデューサーの特徴は、それぞれトランザクションの初期化、トランザクション開始、トランザクションコミット、トランザクションの終了に対応する、開始、開始、コミットトランザクション、アボートトランザクションなど、一部のトランザクションAPIを呼び出すことです。 このコードは、レコード1とRecord2がトランザクションとしてKafkaに提出されるようにすることができます。それらのすべてが正常に提出されるか、すべて書面が失敗します。 実際、書き込みが失敗したとしても、Kafkaはそれらを基礎となるログに書き込みます。つまり、消費者はこれらのメッセージをまだ表示します。 分離があります。レベルパラメーターには2つの値があります。
インターセプター「Kafkaインターセプターは、生産者インターセプターと消費者インターセプターに分けられます。」プロデューサーインターセプターを使用すると、メッセージを送信する前にインターセプターロジックを埋め込み、メッセージが正常に送信された後に埋め込むことができます。 消費者インターセプターは、メッセージを消費する前および変位を送信した後、特定のロジックの作成をサポートします。 一連のインターセプターを大規模なインターセプターに接続でき、Kafkaは追加の順にインターセプターロジックを順番に実行します。 現在のKafkaインターセプター設定法は、パラメーター構成によって完了します。生産者と消費者は、両端に同じパラメーターインターセプターを持っています。クラスのグループのリストを指定し、各クラスは特定のロジックのインターセプター実装クラスです。
これら2つのクラスと自分で書くすべてのプロデューサーエンドインターセプター実装クラスは、org.apache.kafka.clients.producer.producerinterceptorインターフェイスを継承する必要があります。 このインターフェイスはKafkaによって提供されており、2つのコアメソッドがあります。
同様に、消費者インターセプターを指定することは同じ方法ですが、特定の実装クラスはorg.apache.kafka.clients.consumer.consumerInterceptorインターフェイスを実装する必要があり、ここには2つのコアメソッドがあります。
「インターセプタークラスを指定する場合、それらを指定する必要がある」ことに注意することが重要です。 Laymanの用語では、完全なパッケージ名を追加し、そこにクラス名が1つだけではありません。また、プロデューサープログラムがインターセプタークラスを正しくロードできるようにする必要があります。 コントローラ「コントローラー、その主な機能は、Apache Zookeeperの助けを借りてKafkaクラスターを管理および調整することです。」クラスター内のブローカーはコントローラーとして機能することができますが、操作中、1人のブローカーのみがコントローラーになり、その管理と調整の責任を実行できます。 Kafkaコントローラーは、Zookeeperの時計機能を広範囲に使用して、クラスターの調整された管理を実現します。 「控制器是如何被选出来的」实际上,Broker在启动时,会尝试去ZooKeeper中创建/controller节点。 Kafka当前选举控制器的规则是:「第一个成功创建/controller节点的Broker会被指定为控制器」。 「控制器是做什么的」控制器的职责大致可以分为5种: 1.「主题管理(创建、删除、增加分区)」控制器帮助我们完成对Kafka主题的创建、删除以及分区增加的操作。 2.「分区重分配」3.「Preferred领导者选举」Preferred领导者选举主要是Kafka为了避免部分Broker负载过重而提供的一种换Leader的方案。 4.「集群成员管理(新增Broker、Broker主动关闭、Broker宕机)」包括自动检测新增Broker、Broker主动关闭及被动宕机。 这种自动检测是依赖于Watch功能和ZooKeeper临时节点组合实现的。 比如,控制器组件会利用「Watch机制」检查ZooKeeper的/brokers/ids节点下的子节点数量变更。 目前,当有新Broker启动后,它会在/brokers下创建专属的znode节点。 一旦创建完毕,ZooKeeper会通过Watch机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增Broker作业。 侦测Broker存活性则是依赖于刚刚提到的另一个机制:「临时节点」。 每个Broker启动后,会在/brokers/ids下创建一个临时znode。 当Broker宕机或主动关闭后,该Broker与ZooKeeper的会话结束,这个znode会被自动删除。 同理,ZooKeeper的Watch机制将这一变更推送给控制器,这样控制器就能知道有Broker关闭或宕机了,从而进行善后。 5.「数据服务」控制器上保存了最全的集群元数据信息,其他所有Broker会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。 「控制器故障转移(Failover)」 「故障转移指的是,当运行中的控制器突然宕机或意外终止时,Kafka能够快速地感知到,并立即启用备用控制器来代替之前失败的控制器」。这个过程就被称为Failover,该过程是自动完成的,无需你手动干预。 最开始时,Broker 0是控制器。当Broker 0宕机后,ZooKeeper通过Watch机制感知到并删除了/controller临时节点。 之后,所有存活的Broker开始竞选新的控制器身份。 Broker 3最终赢得了选举,成功地在ZooKeeper上重建了/controller节点。 之后,Broker 3会从ZooKeeper中读取集群元数据信息,并初始化到自己的缓存中。 至此,控制器的Failover完成,可以行使正常的工作职责了。 日志存储Kafka中的消息是以主题为基本单位进行归类的,每个主题在逻辑上相互独立。 每个主题又可以分为一个或多个分区,在不考虑副本的情况下,一个分区会对应一个日志。 但设计者考虑到随着时间推移,日志文件会不断扩大,因此为了防止Log过大,设计者引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,便于后续的消息维护和清理工作。 下图描绘了主题、分区、副本、Log、LogSegment五者之间的关系。 「LogSegment」在Kafka中,每个Log对象又可以划分为多个LogSegment文件,每个LogSegment文件包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。 其中,每个LogSegment中的日志数据文件大小均相等(该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的「log.segment.bytes」进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件)。 一般的なパラメーター「broker端配置」
每个kafka broker 都有一个唯一的标识来表示,这个唯一的标识符即是broker.id,它的默认值是0。 这个值在kafka 集群中必须是唯一的,这个值可以任意设定,
如果使用配置样本来启动kafka,它会监听9092 端口,修改port 配置参数可以把它设置成任意的端口。 要注意,如果使用1024 以下的端口,需要使用root 权限启动kakfa。
用于保存broker 元数据的Zookeeper 地址是通过zookeeper.connect 来指定的。 比如可以这么指定localhost:2181 表示这个Zookeeper 是运行在本地2181 端口上的。 我们也可以通过比如我们可以通过zk1:2181,zk2:2181,zk3:2181 来指定zookeeper.connect 的多个参数值。 该配置参数是用冒号分割的一组hostname:port/path 列表,其含义如下
如果你有两套Kafka 集群,假设分别叫它们kafka1 和kafka2,那么两套集群的zookeeper.connect参数可以这样指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2
Kafka 把所有的消息都保存到磁盘上,存放这些日志片段的目录是通过log.dirs 来制定的,它是用一组逗号来分割的本地系统路径,log.dirs 是没有默认值的,「你必须手动指定他的默认值」。 其实还有一个参数是log.dir,这个配置是没有s 的,默认情况下只用配置log.dirs 就好了,比如你可以通过/home/kafka1,/home/kafka2,/home/kafka3 这样来配置这个参数的值。
默认情况下,kafka 会自动创建主题 auto.create.topics.enable参数建议最好设置成false,即不允许自动创建Topic。 「主题相关配置」
num.partitions 参数指定了新创建的主题需要包含多少个分区,该参数的默认值是1。
这个参数比较简单,它表示kafka保存消息的副本数。
Kafka 通常根据时间来决定数据可以保留多久。 默认使用log.retention.hours参数来配置时间,默认是168 个小时,也就是一周。 除此之外,还有两个参数log.retention.minutes 和log.retentiion.ms 。 这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用log.retention.ms。
broker 通过设置message.max.bytes 参数来限制单个消息的大小,默认是1000 000, 也就是1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到broker 返回的错误消息。
规定了该主题消息被保存的时常,默认是7天,即该主题只能保存7天的消息,一旦设置了这个值,它会覆盖掉Broker 端的全局参数值。 消息丢失问题「生产者程序丢失数据」目前Kafka Producer是异步发送消息的,也就是说如果你调用的是producer.send(msg)这个API,那么它通常会立即返回,但此时你不能认为消息发送已成功完成。 如果用这个方式,可能会有哪些因素导致消息没有发送成功呢? 其实原因有很多,例如网络抖动,导致消息压根就没有发送到Broker端;或者消息本身不合格导致Broker拒绝接收(比如消息太大了,超过了Broker的承受能力)等。 实际上,解决此问题的方法非常简单:Producer永远要使用带有回调通知的发送API,也就是说不要使用producer.send(msg),而要使用producer.send(msg, callback)。 它能准确地告诉你消息是否真的提交成功了。 一旦出现消息提交失败的情况,你就可以有针对性地进行处理。 「消费者程序丢失数据」Consumer端丢失数据主要体现在Consumer端要消费的消息不见了。 下面这张图它清晰地展示了Consumer端的位移数据。 比如对于Consumer A而言,它当前的位移值就是9;Consumer B的位移值是11。 Consumer程序从Kafka获取到消息后开启了多个线程异步处理消息,而Consumer程序自动地向前更新位移。 假如其中某个线程运行失败了,它负责的消息没有被成功处理,但位移已经被更新了,因此这条消息对于Consumer而言实际上是丢失了。 这里的关键在于Consumer自动提交位移。 这个问题的解决方案也很简单: 「如果是多线程异步处理消费消息,Consumer程序不要开启自动提交位移,而是要应用程序手动提交位移」。ベストプラクティス总结Kafka无消息丢失的配置:
确保消息消费完成再提交,Consumer端有个参数enable.auto.commit,最好把它设置成false,并采用手动提交位移的方式。 重复消费问题「消费重复的场景」在enable.auto.commit 默认值true情况下,出现重复消费的场景有以下几种:
例如在一次poll 500条消息后,消费到200条时,进程被强制kill消费到offset未提交,或出现异常退出导致消费到offset未提交。 下次重启时,依然会重新拉取500消息,造成之前消费到200条消息重复消费了两次。 解决方案:在发生异常时正确处理未提交的offset 「消费者消费时间过长」 max.poll.interval.ms参数定义了两次poll的最大间隔,它的默认值是5 分钟,表示你的Consumer 程序如果在5 分钟之内无法消费完poll 方法返回的消息,那么Consumer 会主动发起离开组的请求,Coordinator 也会开启新一轮Rebalance。 举例:单次拉取11条消息,每条消息耗时30s,11条消息耗时5分钟30秒,由于max.poll.interval.ms 默认值5分钟,所以消费者无法在5分钟内消费完,consumer会离开组,导致rebalance。 在消费完11条消息后,consumer会重新连接broker,再次rebalance,因为上次消费的offset未提交,再次拉取的消息是之前消费过的消息,造成重复消费。 「解决方案:」1、提高消费能力,提高单条消息的处理速度;根据实际场景可讲max.poll.interval.ms值设置大一点,避免不必要的rebalance;可适当减小max.poll.records的值,默认值是500,可根据实际消息速率适当调小。 2、生成消息时,可加入唯一标识符如消息id,在消费端,保存最近的1000条消息id存入到redis或mysql中,消费的消息时通过前置去重。 消息顺序问题我们都知道kafka的topic是无序的,但是一个topic包含多个partition,每个partition内部是有序的 「乱序场景1」因为一个topic可以有多个partition,kafka只能保证partition内部有序 「解决方案」1、可以设置topic,有且只有一个partition 2、根据业务需要,需要顺序的指定为同一个partition 3、根据业务需要,比如同一个订单,使用同一个key,可以保证分配到同一个partition上 「乱序场景2」对于同一业务进入了同一个消费者组之后,用了多线程来处理消息,会导致消息的乱序 「解决方案」消费者内部根据线程数量创建等量的内存队列,对于需要顺序的一系列业务数据,根据key或者业务数据,放到同一个内存队列中,然后线程从对应的内存队列中取出并操作 「通过设置相同key来保证消息有序性,会有一点缺陷:」例如消息发送设置了重试机制,并且异步发送,消息A和B设置相同的key,业务上A先发,B后发,由于网络或者其他原因A发送失败,B发送成功;A由于发送失败就会重试且重试成功,这时候消息顺序B在前A在后,与业务发送顺序不一致,如果需要解决这个问题,需要设置参数max.in.flight.requests.per.connection=1,其含义是限制客户端在单个连接上能够发送的未响应请求的个数,设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,这个参数默认值是5
高性能原因「顺序读写」kafka的消息是不断追加到文件中的,这个特性使kafka可以充分利用磁盘的顺序读写性能 顺序读写不需要硬盘磁头的寻道时间,只需很少的扇区旋转时间,所以速度远快于随机读写 Kafka 可以配置异步刷盘,不开启同步刷盘,异步刷盘不需要等写入磁盘后返回消息投递的ACK,所以它提高了消息发送的吞吐量,降低了请求的延时 「零拷贝」传统的IO 流程,需要先把数据拷贝到内核缓冲区,再从内核缓冲拷贝到用户空间,应用程序处理完成以后,再拷贝回内核缓冲区 这个过程中发生了多次数据拷贝 为了减少不必要的拷贝,Kafka 依赖Linux 内核提供的Sendfile 系统调用 在Sendfile 方法中,数据在内核缓冲区完成输入和输出,不需要拷贝到用户空间处理,这也就避免了重复的数据拷贝 在具体的操作中,Kafka 把所有的消息都存放在单独的文件里,在消息投递时直接通过Sendfile 方法发送文件,减少了上下文切换,因此大大提高了性能 「MMAP技术」除了Sendfile 之外,还有一种零拷贝的实现技术,即Memory Mapped Files Kafka 使用Memory Mapped Files 完成内存映射,Memory Mapped Files 对文件的操作不是write/read,而是直接对内存地址的操作,如果是调用文件的read 操作,则把数据先读取到内核空间中,然后再复制到用户空间,但MMAP可以将文件直接映射到用户态的内存空间,省去了用户空间到内核空间复制的开销 Producer生产的数据持久化到broker,采用mmap文件映射,实现顺序的快速写入 Customer从broker读取数据,采用sendfile,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。 「批量发送读取」Kafka 的批量包括批量写入、批量发布等。它在消息投递时会将消息缓存起来,然后批量发送 同样,消费端在消费消息时,也不是一条一条处理的,而是批量进行拉取,提高了消息的处理速度 「数据压缩」Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩 压缩的好处就是减少传输的数据量,减轻对网络传输的压力 Producer压缩之后,在Consumer需进行解压,虽然增加了CPU的工作,但在对大数据处理上,瓶颈在网络上而不是CPU,所以这个成本很值得 「分区机制」kafka中的topic中的内容可以被分为多partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力 面接でよく聞かれる質問「Kafka是Push还是Pull模式?」Kafka最初考虑的问题是,customer应该从brokes拉取消息还是brokers将消息推送到consumer。 在这方面,Kafka遵循了一种大部分消息系统共同的传统的设计:producer将消息推送到broker,consumer从broker拉取消息。 push模式由broker决定消息推送的速率,对于不同消费速率的consumer就不太好处理了。 消息系统都致力于让consumer以最大的速率最快速的消费消息,push模式下,当broker推送的速率远大于consumer消费的速率时,consumer恐怕就要崩溃了。
Pull模式的一个好处是consumer可以自主决定是否批量的从broker拉取数据。 Pull有个缺点是,如果broker没有可供消费的消息,将导致consumer不断在循环中轮询,直到新消息到达。 「Kafka如何保证高可用?」面试题:Kafka如何保证高可用?有图有真相 「Kafk的使用场景」业界Kafka实际应用场景
消息中间件在异步通信中用的最多,很多业务流程中,如果所有步骤都同步进行可能会导致核心流程耗时非常长,更重要的是所有步骤都同步进行一旦非核心步骤失败会导致核心流程整体失败,因此在很多业务流程中Kafka就充当了异步通信角色。
大规模分布式系统中的机器非常多而且分散在不同机房中,分布式系统带来的一个明显问题就是业务日志的查看、追踪和分析等行为变得十分困难,对于集群规模在百台以上的系统,查询线上日志很恐怖。 为了应对这种场景统一日志系统应运而生,日志数据都是海量数据,通常为了不给系统带来额外负担一般会采用异步上报,这里Kafka以其高吞吐量在日志处理中得到了很好的应用。
随着据量的增加,离线的计算会越来越慢,难以满足用户在某些场景下的实时性要求,因此很多解决方案中引入了实时计算。 很多时候,即使是海量数据,我们也希望即时去查看一些数据指标,实时流计算应运而生。 实时流计算有两个特点,一个是实时,随时可以看数据;另一个是流。 【編集者のおすすめ】
|
<<: エッジコンピューティングシステムの構成とコンセプトについて、複数の図を使って詳しく説明します。まだ覚えていますか?
>>: コンテナを理解するには、まずその歴史から始めましょう
SAP と Oracle はどちらも中規模企業および大規模企業の代名詞であると考えられています。彼ら...
占い、性格分析、ゆるキャラ、生贄など伝統的なビジネスは、インターネット上でどのように生き残ることがで...
cloudpowerallはマレーシアに登録された新しい会社で、主に低コスト路線でVPS(US cn...
SEO の完全な英語名は Search Engine Optimization で、中国語名は se...
最近、UMA(中国インターネット品質オーディエンスマーケティング連盟)という組織がビッグデータプラッ...
現在でも、AWS は Gartner Magic Quadrant でリーダーの地位を維持しています...
[[382699]]競争の激しいビジネス環境において、アクセシビリティ、機能性、汎用性を求める企業組...
現在、国内のネットワークは急速に発展しており、ウェブサイトの最適化とSEOもますます多くの人々に認識...
2020年に世界を席巻したCOVID-19パンデミックにより、オフライン教育にさまざまな程度の制限が...
2020年に「新インフラ」の構築が最高潮に達し始めたとき、クラウドコンピューティング業界が最も恩恵を...
みなさんこんにちは、私は Wei Dongdong です。多くのウェブマスターにとって、ISS ログ...
1月18日、テンセントクラウドは世界的なビデオソーシャルプラットフォームBIGOとの戦略的提携を発表...
月収10万元の起業の夢を実現するミニプログラム起業支援プラン私はかつてゲーム中毒でした。夢の中でもゲ...
丸一週間の準備の後、外部リンクは徐々に定期的になってきました。ほとんどの新人ウェブマスターと同様に、...
[[262785]] Technavio は、エッジ コンピューティング テクノロジの応用が 201...