これまで、Kafka の全体的なアーキテクチャ、Kafka プロデューサー、そして Kafka によって生成されたメッセージは最終的にどこに行くのかについて紹介しました。もちろん、それを消費する必要があります。そうしないと、一連のデータを生成するだけでは役に立たなくなります。カフカをレストランに例えると、生産者はシェフであり、消費者はゲストです。シェフだけがいると、そのシェフが作った料理を誰も食べなければ意味がありません。ゲストだけでシェフがいなかったら、誰がこのレストランに食事に行くでしょうか? !ですので、前回の記事を読んでもまだ満足できない方は、引き続きお楽しみください。前回の記事をまだ読んでいない方は、これから復習を始めたいと思います。 Kafka コンシューマーコンセプト アプリケーションは KafkaConsumer を使用して Kafka 内のトピックをサブスクライブし、これらのトピックからメッセージを受信して保存します。アプリケーションはまず、KafkaConsumer オブジェクトを作成し、トピックをサブスクライブしてメッセージの受信を開始し、メッセージを検証して結果を保存する必要があります。しばらくすると、プロデューサーはアプリケーションがデータを検証できるよりも速くトピックに書き込みます。こういう時、私たちは何をすべきでしょうか?単一のコンシューマーのみを使用すると、アプリケーションはメッセージ生成の速度に追いつくことができなくなります。複数のプロデューサーが同じトピックにメッセージを書き込むのと同様に、トピック内のメッセージの消費に参加し、メッセージを転送するには複数のコンシューマーが必要です。 Kafka コンシューマーはコンシューマー グループに属します。グループ内のコンシューマーは同じトピックをサブスクライブし、各コンシューマーはトピック パーティションの一部からメッセージを受信します。以下はKafkaのパーティション消費の図です。 上図のトピック T1 には、パーティション 0、パーティション 1、パーティション 2、パーティション 3 の 4 つのパーティションがあります。コンシューマー グループ 1 を作成します。コンシューマー グループにはコンシューマーが 1 つだけ存在します。トピック T1 をサブスクライブし、T1 内のすべてのメッセージを受信します。 1つのコンシューマーがパーティションから4つのプロデューサーに送信されたメッセージを処理するため、負荷が少し高くなり、タスクを分担するためのヘルパーが必要になるため、次の図のように進化します。 このようにして、消費者の消費能力は大幅に向上します。ただし、ユーザーが大量のメッセージを生成する場合など、一部の環境では、プロデューサーによって生成されるメッセージはコンシューマーが処理するには多すぎるため、コンシューマーの数を増やし続ける必要があります。 上の図に示すように、各パーティションによって生成されたメッセージは、各コンシューマー グループ内のコンシューマーによって消費されます。消費者グループにさらに消費者が追加されると、下の図に示すように、余分な消費者はアイドル状態になります。 コンシューマーをグループに追加することは、コンシューマーの容量を水平方向に拡張する主な方法です。まとめると、消費者グループ内の消費者数を増やして水平展開を行うことで、消費能力を高めることができます。そのため、トピックを作成するときは、より多くのパーティションを使用して、消費負荷が高いときにコンシューマーを追加してパフォーマンスを向上させることをお勧めします。さらに、余分なコンシューマーはアイドル状態になり、何の役にも立たないため、コンシューマーの数はパーティションの数より大きくしないでください。 Kafka の非常に重要な機能は、メッセージを一度だけ書き込むだけでよく、任意の数のアプリケーションがメッセージを読み取ることができることです。つまり、すべてのアプリケーションがメッセージの全量を読み取ることができます。各アプリケーションがメッセージの全量を読み取るためには、アプリケーションに異なるコンシューマー グループが必要です。上記の例で、新しい消費者グループG2を追加し、この消費者グループに2人の消費者がいる場合、次の図のように進化します。 このシナリオでは、コンシューマー グループ G1 とコンシューマー グループ G2 の両方がトピック T1 からの全量のメッセージを受信できます。論理的に言えば、それらは異なるアプリケーションに属します。 要約すると、アプリケーションがすべてのメッセージを読み取る必要がある場合は、アプリケーションのコンシューマー グループを設定してください。アプリケーションの消費容量が不十分な場合は、このコンシューマー グループにコンシューマーを追加することを検討してください。 コンシューマ グループとパーティションの再調整 消費者グループとは何ですか? コンシューマ グループは、1 つ以上のコンシューマ インスタンスのグループであり、スケーラブルでフォールト トレラントなメカニズムです。コンシューマー グループ内のコンシューマーは、グループ ID とも呼ばれるコンシューマー グループ ID を共有します。グループ内のコンシューマーは、一緒にトピックをサブスクライブして消費します。同じグループ内のコンシューマーは、1 つのパーティションからのメッセージのみを消費できます。余剰の消費者は怠惰になり、役に立たなくなります。 上記では2つの消費方法について説明しました。
消費者のバランス調整 上記のコンシューマー進化図から、次のようなプロセスがわかります。最初に、コンシューマーはトピックをサブスクライブし、そのすべてのパーティションからメッセージを消費します。その後、消費者がグループに参加し、さらに多くの消費者がグループに参加します。新しく追加されたコンシューマー インスタンスは、最初のコンシューマーのメッセージの一部を共有します。パーティションの所有権をあるコンシューマーから他のコンシューマーに転送するこの動作は、リバランスと呼ばれ、英語では Rebalance とも呼ばれます。下の図に示すように バランス調整は非常に重要です。消費者グループに高い可用性とスケーラビリティをもたらします。コンシューマーを安全に追加または削除することはできますが、通常の状況ではそのような動作は発生しません。再バランス調整中、コンシューマーはメッセージを読み取ることができないため、再バランス調整中はコンシューマー グループ全体が使用できなくなります。さらに、パーティションが別のコンシューマーに再割り当てされると、メッセージの現在の読み取り状態が失われ、キャッシュの更新が必要になる場合もあり、回復するまでアプリケーションの速度が低下します。 コンシューマーはコンシューマー グループ内のメンバーシップを維持し、組織コーディネーター (Kafka ブローカー) にハートビートを送信して所有するパーティションを確認します。消費者グループが異なれば、主催者やコーディネーターも異なる場合があります。コンシューマーがハートビートを定期的に送信している限り、コンシューマーは稼働中であり、そのパーティション内でメッセージを処理していると見なされます。ハートビートは、コンシューマーがレコードを取得したり、消費したレコードをコミットしたりするときに送信されます。 一定期間後に Kafka がハートビートの送信を停止すると、セッションは期限切れになり、オーガナイザーはコンシューマーが死亡したと判断し、再バランスがトリガーされます。コンシューマーがダウンしてメッセージの送信を停止した場合、コーディネーターは再バランスをトリガーする前に、コンシューマーがダウンしていることを確認するために数秒待機します。この間、デッド コンシューマーはメッセージを処理しません。コンシューマーをクリーンアップする場合、コンシューマーはグループを離れることをコーディネーターに通知し、組織コーディネーターは処理の一時停止を最小限に抑えるために再バランスをトリガーします。 バランス調整は諸刃の剣です。これは消費者グループに高い可用性とスケーラビリティをもたらしますが、コミュニティがまだ修正できていない明らかな欠点 (バグ) もいくつかあります。 リバランスプロセスは消費者グループに大きな影響を与えます。すべての再バランス処理によってすべてが停止するため、JVM のガベージ コレクション メカニズム、つまり Stop The World (STW) を参照してください (「Java 仮想マシンの徹底理解」の p76 にあるシリアル コレクターの説明から引用)。 さらに重要なのは、ガベージ コレクションを実行するときに、他のすべての作業スレッドを一時停止する必要があることです。回収されるまで。 Stop The World という名前はかっこいいですが、この作業は実際にはバックグラウンドの仮想マシンによって自動的に開始され、完了します。ユーザーの通常の作業スレッドはすべて、ユーザーの目に留まらないまま停止されますが、これは多くのアプリケーションでは受け入れられません。 つまり、再バランス調整期間中、コンシューマー グループ内のすべてのコンシューマー インスタンスは消費を停止し、再バランス調整が完了するまで待機します。そして、再調整プロセスは非常に遅いです... 消費者の創造 上記の理論はちょっとやりすぎなので、消費者がどのように消費するかをコードで説明してみましょう。 メッセージを読み取る前に、KafkaConsumer オブジェクトを作成する必要があります。 KafkaConsumer オブジェクトの作成は、KafkaProducer オブジェクトの作成と非常に似ています。つまり、コンシューマーに渡す必要があるプロパティをプロパティ オブジェクトに配置します。後ほど、Kafka のいくつかの構成に焦点を当てます。ここでは、bootstrap.server、key.deserializer、value.deserializer という 3 つのプロパティを使用して、単純なものを作成します。 これら 3 つのプロパティは何度も使用してきました。まだよくわからない場合は、「Kafka プロデューサー」の記事を参照して詳細を確認してください。 group.id と呼ばれる別の属性があります。この属性は必須ではありません。 KafkaConsumer がどのコンシューマー グループに属するかを指定します。どのグループにも属さないコンシューマーを作成することも可能です。
トピックの購読 コンシューマーを作成したら、次のステップはトピックをサブスクライブすることです。 subscribe() メソッドはトピックのリストをパラメータとして受け入れ、比較的簡単に使用できます。
簡単にするために、customerTopic という 1 つのトピックのみをサブスクライブします。渡されるパラメータは正規表現であり、複数のトピックに一致できます。誰かが新しいトピックを作成し、そのトピックの名前が正規表現と一致すると、すぐに再バランスがトリガーされ、コンシューマーは新しいトピックを読み取ることができます。 すべてのテスト関連トピックを購読するには、次のようにします。
投票 Kafka がサブスクリプション/パブリッシュ モデルをサポートしていることはわかっています。プロデューサーは Kafka ブローカーにデータを送信しますが、コンシューマーはプロデューサーがデータを送信したことをどのようにして知るのでしょうか?実際、消費者はプロデューサーによって生成されたデータを知りません。 KafkaConsumer はポーリングを使用して Kafka Broker からデータを定期的に取得します。データがあれば消費されます。そうでない場合は、ポーリングを継続して待機します。ポーリングと待機の具体的な実装は以下のとおりです。
スレッドセーフティ 同じグループ内で、1 つのスレッドに複数のコンシューマーを実行させたり、複数のスレッドにコンシューマーを安全に共有させたりすることはできません。ルールによれば、1 人の消費者は 1 つのスレッドを使用します。コンシューマー グループ内の複数のコンシューマーを実行する場合、各コンシューマーは独自のスレッドで実行する必要があります。 Java で ExecutorService を使用して、処理のために複数のコンシューマーを開始できます。 消費者の構成 これまで、コンシューマー API の使用方法を学習しましたが、最も基本的なプロパティのいくつかを紹介しただけです。 Kafka のドキュメントには、コンシューマー関連のすべての構成手順がリストされています。ほとんどのパラメータには適切なデフォルト値があり、通常は変更する必要はありません。これらのパラメータを以下に紹介します。
このプロパティは、コンシューマーがサーバーから取得する必要がある最小バイト数を指定します。ブローカーがコンシューマーからデータ要求を受信したときに、利用可能なデータの量が fetch.min.bytes で指定されたサイズより少ない場合、ブローカーは十分なデータが利用可能になるまで待機してから、コンシューマーにデータを返します。これにより、トピックがあまり頻繁に使用されていない場合にメッセージをやり取りする必要がないため、コンシューマーとブローカーの作業負荷が軽減されます。利用可能なデータがあまりないが、コンシューマーの CPU 使用率が高い場合は、このプロパティをデフォルトよりも高い値に設定することをお勧めします。消費者の数が多い場合は、このプロパティの値を増やすとブローカーの作業負荷を軽減できます。
上記の fetch.min.bytes を通じて、十分なデータが得られるまでコンシューマーにデータを返さないことを Kafka に伝えます。 fetch.max.wait.ms はブローカーの待機時間を指定するために使用され、デフォルトは 500 ミリ秒です。 Kafka に十分なデータが流入しない場合、コンシューマーが取得する必要があるデータの最小量が満たされず、500 ミリ秒の遅延が発生します。潜在的な遅延を減らしたい場合は、パラメータ値をより小さい値に設定できます。 fetch.max.wait.ms が 100 ミリ秒の遅延に設定され、fetch.min.bytes の値が 1 MB に設定されている場合、Kafka はコンシューマー要求を受信した後 1 MB のデータを返すか、100 ミリ秒後に利用可能なすべてのデータを返します。どの条件が最初に満たされるかによって異なります。
このプロパティは、サーバーが各パーティションからコンシューマーに返す最大バイト数を指定します。デフォルト値は 1MB です。つまり、各パーティションから KafkaConsumer.poll() メソッドによって返されるレコードは、max.partition.fetch.bytes で指定されたバイト数を超えません。トピックに 20 個のパーティションと 5 つのコンシューマーがある場合、各コンシューマーはレコードを受信するために少なくとも 4 MB の空きメモリを必要とします。グループ内のコンシューマーがクラッシュした場合、残りのコンシューマーがより多くのパーティションを処理する必要があるため、コンシューマーにメモリを割り当てるときは、より多くのメモリを割り当てることができます。 max.partition.fetch.bytes の値は、ブローカーが受信できる最大メッセージ サイズ (max.message.size プロパティによって構成) よりも大きくする必要があります。そうしないと、コンシューマーはこれらのメッセージを読み取ることができず、ハングして再試行する可能性があります。このプロパティを設定する際のもう 1 つの考慮事項は、コンシューマーがデータを処理するのにかかる時間です。セッションの有効期限切れとパーティションの再バランス調整を回避するために、コンシューマーは poll() メソッドを頻繁に呼び出す必要があります。 poll() への 1 回の呼び出しで返されるデータが多すぎる場合、コンシューマーはそれを処理するためにより多くの時間を必要とし、セッションの有効期限が切れるのを避けるために次のポーリングを時間内に実行できない可能性があります。このような場合は、max.partition.fetch.bytes の値を減らすか、セッションの有効期限を延長することができます。
このプロパティは、コンシューマーがサーバーから切断されてから、それが停止しているとみなされるまでの時間を指定します。デフォルトは3秒です。コンシューマーが session.timeout.ms で指定された時間内にグループ コーディネーターにハートビートを送信しない場合は、デッドとみなされ、コーディネーターは再バランスをトリガーします。パーティションをコンシューマー グループ内の他のコンシューマーに配布します。このプロパティは heartbeat.interval.ms と密接に関連しています。 heartbeat.interval.ms は、poll() メソッドがグループ コーディネーターにハートビートを送信する頻度を指定し、session.timeout.ms は、コンシューマーがハートビートを送信せずに待機できる時間を指定します。したがって、通常、これら 2 つのプロパティは同時に変更する必要があります。 Heartbeat.interval.ms は session.timeout.ms より小さくする必要があり、通常は session.timeout.ms の 3 分の 1 になります。 session.timeout.ms が 3 秒の場合、heartbeat.interval.ms は 1 秒にする必要があります。 session.timeout.ms をデフォルト値より小さい値に設定すると、クラッシュしたノードの検出と回復が速くなりますが、ポーリングやガベージ コレクションが長時間に及ぶと予期しない再バランス調整が発生する可能性があります。このプロパティを大きい値に設定すると、予期しない再バランス調整が減りますが、ノードのクラッシュを検出するのに時間がかかります。
このプロパティは、オフセットなしでパーティションを読み取るとき、またはオフセットが無効な場合にコンシューマーがどのように動作するかを指定します。デフォルト値は latest です。つまり、オフセットが無効な場合、コンシューマーは最新のレコードからデータの読み取りを開始します。もう 1 つの値は earliest です。これは、オフセットが無効な場合、コンシューマーがパーティションのレコードを最初から読み取り始めることを意味します。
オフセットをコミットするさまざまな方法については後ほど説明します。このプロパティは、コンシューマーがオフセットを自動的に送信するかどうかを指定します。デフォルト値は true です。データの重複やデータの損失をできるだけ回避するために、これを false に設定し、オフセットをいつ送信するかを自分で制御することができます。 trueに設定すると、auto.commit.interval.msプロパティを介してコミットの頻度を制御することもできます。
パーティションはグループ内のコンシューマーに割り当てられることがわかっています。 PartitionAssignor は、指定されたコンシューマーとトピックに基づいて、どのパーティションをどのコンシューマーに割り当てるかを決定します。 Kafka には、Range と RoundRobin という 2 つのデフォルトの割り当て戦略があります。
この属性には任意の文字列を指定できます。ブローカーはこれを使用して、クライアントから送信されたメッセージを識別します。通常、ログ、メトリック、クォータで使用されます。
このプロパティは、call() メソッドへの 1 回の呼び出しで返されるレコードの数を制御するために使用され、ポーリングで処理する必要があるデータの量を制御するのに役立ちます。
データの読み取りおよび書き込み時にソケットによって使用される TCP バッファのサイズも設定できます。 -1 に設定されている場合、オペレーティング システムのデフォルトが使用されます。プロデューサーまたはコンシューマーがブローカーとは異なるデータセンターにある場合、データセンター間のネットワークは一般にレイテンシが高く、帯域幅が低いため、これらの値を適切に増やすことができます。 コミットとオフセットの概念 特殊オフセット 上で述べたように、コンシューマーがスケジュールされたポーリングのために poll() メソッドを呼び出すたびに、プロデューサーによって Kafka に書き込まれたがコンシューマーによってまだ消費されていないレコードが返されます。したがって、グループ内のどのコンシューマーがどのレコードを読み取ったかを追跡できます。コンシューマーは Kafka を使用して、パーティション内のメッセージの位置 (オフセット) を追跡できます。 コンシューマーは、_consumer_offset と呼ばれる特別なトピックにメッセージを送信します。このトピックは、送信された各メッセージのパーティション オフセットを保存します。このトピックの主な機能は、消費者がリバランスをトリガーした後のオフセットを記録することです。コンシューマーがこのトピックにメッセージを送信するたびに、通常の状況では再バランスがトリガーされることはありません。このトピックは機能しません。再バランスがトリガーされると、コンシューマーは動作を停止し、各コンシューマーが対応するパーティションに割り当てられる場合があります。このトピックは、コンシューマーがメッセージの処理を続行できるように設定されています。 送信されたオフセットがクライアントによって最後に処理されたオフセットよりも小さい場合、2 つのオフセット間のメッセージは繰り返し処理されます。 送信されたオフセットが最後の消費のオフセットより大きい場合、2 つのオフセット間のメッセージは失われます。 _consumer_offset は非常に重要なので、どのように送信するのでしょうか?以下でそれについて話しましょう 提出方法 KafkaConsumer APIはオフセットをコミットする複数の方法を提供します 自動送信 最も簡単な方法は、消費者がオフセットを自動的にコミットできるようにすることです。 enable.auto.commit が true に設定されている場合、コンシューマーは 5 秒ごとに poll() メソッドによってポーリングされた最大オフセットを自動的にコミットします。コミット間隔は auto.commit.interval.ms によって制御され、デフォルトは 5 秒です。コンシューマー内の他のすべてと同様に、自動コミットはポーリング サイクルで実行されます。コンシューマーは各ポーリングでオフセットがコミットされているかどうかを確認し、コミットされている場合は、前回のポーリングから返されたオフセットをコミットします。 現在のオフセットをコミット auto.commit.offset を false に設定すると、アプリケーションがオフセットをコミットするタイミングを決定できるようになります。 commitSync() を使用してオフセットをコミットします。この API は、poll() メソッドによって返された最新のオフセットを送信し、送信が成功した場合はすぐに戻り、送信が失敗した場合は例外をスローします。 commitSync() は、poll() によって返された最新のオフセットをコミットします。すべてのレコードが処理された場合は、必ず commitSync() を呼び出してください。そうしないと、メッセージが失われるリスクが残ります。再バランス調整が発生すると、最新のメッセージ バッチから再バランス調整と再バランス調整の間に発生したメッセージまでのすべてのメッセージが繰り返し処理されます。 非同期送信 非同期 commitAsync() と同期 commitSync() の最大の違いは、非同期コミットは再試行されないのに対し、同期コミットは一貫して再試行される点です。 同期送信と非同期送信を組み合わせる 一般的に言えば、時々送信に失敗しても再試行しなければ、大きな問題にはなりません。送信の失敗が一時的な問題によるものであれば、その後の送信は必ず成功するからです。ただし、コンシューマーのクローズまたは再バランス調整前の最後の送信である場合は、送信が成功したことを確認する必要があります。 したがって、commitAsync と commitSync は通常、コンシューマーが閉じられる前にオフセットをコミットするために組み合わせて使用されます。 特定のオフセットをコミットする コンシューマー API を使用すると、コミットするパーティションとオフセットのマップを渡す (つまり、特定のオフセットをコミットする) ことで、commitSync() メソッドと commitAsync() メソッドを呼び出すことができます。 |
>>: Veeam 2020年の技術予測: クラウドが新たなユニバーサル展開モデルに
オンラインでマーケティングを行う方法は数多くあり、各企業のマーケティング手法は大きく異なります。ただ...
クラウドコンピューティングの世界では不可能なことは何もありません。クラウドバーストにより、新しいパッ...
Reprisehostingは低価格・格安サーバーに特化した事業者と言えます。アメリカ西海岸のシアト...
広告には「7秒」の印象理論があります。広告が消費者を引き付けられるかどうかの鍵は、最初の7秒にありま...
新創クラウドが大きな市場になることは間違いありません。一方では、国は情報化・イノベーション産業の建設...
「ダブル11」プロモーション戦争における電子商取引業界の競争は、価格と物流の競争からトラフィックをめ...
百度は外部リンクを張るには規模が大きすぎることを皆が知っています。質問に答えて、お互いに外部リンクを...
「Rancher により、インテリジェント データ プラットフォームを極めて安定して実行し、新しいサ...
2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています中国とは異...
ウェブマスターの友人なら誰でも、自分のウェブサイトに毎日たくさんの訪問者が訪れることを願っているはず...
宝くじサイトに記事を投稿していたとき、このことを深く理解しました。多くのサイトでは宝くじをセンシティ...
業界全体のユーザー規模から見ると、オンラインビデオ分野のアクティブユーザー規模の成長率は鈍化し、年末...
現在、外部リンクの構築に関しては、インターネット上で多くの検索が行われていますが、そのほとんどはフォ...
今日は、あまり知られていない製品、1dollar-webhosting.com を紹介したいと思いま...
1. はじめに今日は、ZStack のベアメタルについて学びます。ベアメタル サービスに関しては、多...