こんなに簡単だったことはかつてありませんでした。10 分で Kafka を倒すことができます。

こんなに簡単だったことはかつてありませんでした。10 分で Kafka を倒すことができます。

[51CTO.com からのオリジナル記事] Apache Kafka は、Scala と Java で記述された、高速でスケーラブル、高スループット、フォールト トレラントな分散型「パブリッシュ サブスクライブ」メッセージング システムであり、あるエンドポイントから別のエンドポイントにメッセージを配信できます。

[[318877]]

画像はPexelsより

従来のメッセージ ミドルウェア (ActiveMQ、RabbitMQ など) と比較すると、Kafka は高スループット、組み込みのパーティショニング、メッセージ レプリケーションのサポート、高いフォールト トレランスなどの特徴があり、大規模なメッセージ処理アプリケーションに非常に適しています。

Kafka 公式サイト: http://kafka.apache.org/

Kafka の主な設計目標は次のとおりです。

  • これは、O(1)の時間計算量でメッセージの永続性を提供し、TBレベルを超えるデータに対しても一定時間のアクセスパフォーマンスを保証できます。
  • 高いスループット。非常に安価な商用マシンでも、1 台のマシンで 1 秒あたり 10 万件のメッセージの送信をサポートできます。
  • 各パーティション内でのメッセージの順次送信を保証しながら、Kafka サーバー間のメッセージのパーティション分割と分散消費をサポートします。
  • オフラインデータ処理とリアルタイムデータ処理の両方をサポートします。
  • オンラインでの水平展開をサポートします。

Kafka は、主に次の 2 つのカテゴリのアプリケーションで使用されます。

  • システムまたはアプリケーション間でデータを確実に取得するためのリアルタイム ストリーミング データ パイプラインを構築します。
  • データ ストリームを変換したり、データ ストリームに反応したりするリアルタイム ストリーミング アプリケーションを構築します。

Kafka がこれらの操作をどのように実行するかを理解するために、Kafka の機能を最初から詳しく見ていきましょう。

まずいくつかの概念を説明します。

  • Kafka は、複数のデータセンターにまたがる 1 つ以上のサーバー上でクラスターとして実行されます。
  • Kafka クラスターは、トピックと呼ばれるカテゴリにレコードのストリームを保存します。
  • 各レコードは、キー、値、タイムスタンプで構成されます。

Kafka のアーキテクチャ システムは次のとおりです。

Kafka には多くのアプリケーション シナリオがあります。最も一般的なものをいくつか挙げます。

① ユーザーアクティビティの追跡:ウェブサイト上のユーザーのさまざまなアクティビティメッセージはさまざまなトピックセンターに公開され、これらのメッセージをリアルタイムで監視および処理できます。

もちろん、Hadoop またはオフライン処理データ ウェアハウスにロードして、ユーザーのプロファイルを作成することもできます。 Taobao、Tmall、JD.com などの大規模な電子商取引プラットフォームは、すべてのユーザーアクティビティを追跡します。

②ログ収集は以下のとおりです。

③ 電流制限とピークカットは下図のようになります。

④ 高いスループット:他のMQと比較して、Kafkaの最大の特徴はその高いスループットです。ストレージ容量を増やすために、Kafka はすべてのメッセージを低速で大容量のハードディスクに書き込みます。

論理的には、これによりパフォーマンスが低下しますが、実際には、Kafka は依然として非常に高いスループット レートを維持でき、パフォーマンスは影響を受けません。

主に以下の方法で高スループットを実現します。

  • 順次読み取りと書き込み: Kafka はメッセージをパーティションに書き込み、パーティション内のメッセージは順次読み取りと書き込みが行われます。順次読み取りと書き込みは、ランダム読み取りと書き込みよりも高速です。
  • ゼロ コピー: プロデューサーとコンシューマーは、Kafka のメッセージにゼロ コピーを使用します。
  • バッチ送信: Kafka ではバッチ送信モードが許可されます。
  • メッセージ圧縮: Kafka ではメッセージ コレクションの圧縮が可能です。

Kafka の利点は次のとおりです。

① デカップリング:プロジェクト開始時に、プロジェクトが将来どのような要件に遭遇するかを予測することは非常に困難です。

メッセージング システムは、暗黙的なデータベース インターフェイス レイヤーを処理の途中に挿入し、処理の両側でこのインターフェイスを実装する必要があります。

これにより、同じインターフェース制約に準拠している限り、両側での処理を個別に拡張または変更できます。

② 冗長化(コピー) :場合によっては、データ処理のプロセスが失敗することがあります。データが永続化されない限り、失われます。

メッセージ キューは、データが完全に処理されるまでデータを保持するため、データ損失のリスクを回避できます。

多くのメッセージ キューで採用されている「挿入、取得、削除」パラダイムでは、キューからメッセージを削除する前に、処理システムはメッセージが処理されたことを明確に示し、使用が完了するまでデータが安全に保存されるようにする必要があります。

③ スケーラビリティ: メッセージ キューは処理プロセスを分離するため、別の処理プロセスを追加するだけで、メッセージのエンキューと処理の頻度を簡単に増やすことができます。コードの変更やパラメータの調整は必要ありません。スケーリングは電源ボタンを押すだけで簡単に行えます。

④柔軟性とピーク処理能力:トラフィック量が大幅に増加した場合でも、アプリケーションは機能し続ける必要がありますが、このようなバーストトラフィックは一般的ではありません。このようなピーク時のトラフィックを処理するために、常にスタンバイ状態になるようにリソースを投資することは、間違いなく大きな無駄です。

メッセージ キューを使用すると、突然の過負荷要求によって主要コンポーネントが完全にクラッシュすることなく、突然のアクセス圧力に耐えることができます。

⑤回復性:システムコンポーネントの一部に障害が発生しても、システム全体に影響が及ぶことはありません。メッセージ キューはプロセス間の結合を減らすため、メッセージを処理するプロセスがクラッシュした場合でも、キューに追加されたメッセージはシステムの回復後に引き続き処理できます。

⑥ 順序保証:ほとんどの使用シナリオでは、データ処理の順序が非常に重要です。ほとんどのメッセージ キューは本質的に順序付けられており、データが特定の順序で処理されることを保証します。 Kafka はパーティション内のメッセージの順序を保証します。

⑦ バッファリング: あらゆる重要なシステムには、異なる処理時間を必要とする要素が存在します。たとえば、画像の読み込みにはフィルターの適用よりも時間がかかりません。

メッセージ キューはバッファー レイヤーを使用してタスクを最大限の効率で実行できるようにし、キューへの書き込み処理を可能な限り高速化します。このバッファは、システムがデータを通過する速度を制御および最適化するのに役立ちます。

⑧非同期通信: 多くの場合、ユーザーはメッセージをすぐに処理することを望まなかったり、処理する必要がありません。メッセージ キューは、ユーザーがメッセージをキューに入れてもすぐには処理しない、非同期処理メカニズムを提供します。必要な数のメッセージをキューに入れて、必要なときに処理することができます。

Kafka と他の MQ の比較は次のとおりです。

①RabbitMQ : RabbitMQはErlangで書かれたオープンソースのメッセージキューです。 AMQP、XMPP、SMTP、STOMP など、多くのプロトコルをサポートしています。このため、非常に重量級であり、エンタープライズ レベルの開発に適しています。

同時に、ブローカー アーキテクチャが実装され、メッセージはクライアントに送信される前に中央キューにキューイングされます。ルーティング、負荷分散、データの永続性に対する優れたサポートを備えています。

②Redis :Redisは、キーと値のペアをベースとしたNoSQLデータベースであり、活発に開発とメンテナンスが行われています。

Key-Value データベースストレージシステムですが、MQ 機能をサポートしているため、軽量なキューサービスとしてもご利用いただけます。

RabbitMQ および Redis のエンキューおよびデキュー操作はそれぞれ 100 万回実行され、10 万回ごとに実行時間が記録されます。テスト データは、128 バイト、512 バイト、1K、10K の 4 つの異なるサイズに分割されます。

実験によると、キューイング時に、データ サイズが比較的小さい場合、Redis のパフォーマンスは RabbitMQ よりも高くなりますが、データ サイズが 10K を超えると、Redis は耐えられないほど遅くなります。デキュー時には、データ サイズに関係なく、Redis は非常に優れたパフォーマンスを示しますが、RabbitMQ のデキュー パフォーマンスは Redis よりもはるかに低くなります。

③ZeroMQ : ZeroMQ は、特に高スループットが求められるシナリオにおいて、最速のメッセージキューシステムとして知られています。

ZeroMQ は RabbitMQ が苦手とする高度で複雑なキューを実装できますが、開発者は複数の技術フレームワークを自分で組み合わせる必要があります。技術的な複雑さは、この MQ をうまく適用する上での課題となります。

ZeroMQ には独自の非ミドルウェア モデルがあり、アプリケーションがサーバーの役割を果たすため、メッセージ サーバーやミドルウェアをインストールして実行する必要はありません。

NuGet を使用してインストールできる ZeroMQ ライブラリを参照するだけで、アプリケーション間でメッセージを簡単に送信できます。

ただし、ZeroMQ は非永続的なキューのみを提供するため、クラッシュするとデータが失われます。その中で、Twitter の Storm バージョン 0.9.0 より前では、デフォルトでデータ ストリームの送信に ZeroMQ を使用していました (Storm はバージョン 0.9 以降、送信モジュールとして ZeroMQ と Netty の両方をサポートしています)。

④ActiveMQ : ActiveMQはApacheのサブプロジェクトです。 ZeroMQ と同様に、ブローカーとピアツーピアのテクノロジーを使用してキューを実装できます。同時に、RabbitMQ と同様に、少量のコードで高度なアプリケーション シナリオを効率的に実装できます。

⑤Kafka/Jafka : Kafka は Apache のサブプロジェクトです。これは、高性能なクロス言語分散パブリッシュ/サブスクライブ メッセージ キュー システムです。 Jafka は、Kafka のアップグレード版である Kafka 上でインキュベートされました。

機能は次のとおりです:

  • 高速な永続性。O(1) のシステム オーバーヘッドでメッセージを永続化できます。
  • 一般的なサーバーで 10W/s のスループット率という高いスループットを実現できます。
  • 完全に分散されたシステムであるブローカー、プロデューサー、コンシューマーはすべて分散をネイティブにサポートし、自動的に負荷分散を実現します。
  • Hadoop データの並列ロードをサポートします。これは、ログ データや Hadoop などのオフライン分析システムに適したソリューションですが、リアルタイム処理には制限があります。

Kafka は、Hadoop の並列ロード メカニズムを通じてオンラインとオフラインのメッセージ処理を統合します。 Apache Kafka は、ActiveMQ と比較して非常に軽量なメッセージング システムです。非常に優れたパフォーマンスを備えているだけでなく、適切に機能する分散システムでもあります。

Kafka の重要な役割は次のとおりです。

①ストレージ システムとしての Kafka : 使用に関係なくメッセージの公開を許可するメッセージ キューは、実行中のメッセージのストレージ システムとして効果的に機能します。 Kafka が他と異なる点は、非常に優れたストレージ システムであることです。

Kafka に書き込まれたデータはディスクに書き込まれ、フォールト トレランスのために複製されます。 Kafka では、プロデューサーは完全に複製されるまで確認応答を待機でき、書き込みサーバーに障害が発生しても書き込みが完了しないことが保証されます。

Kafka のディスク上の構造は、適切に拡張できるように設計されています。サーバー上に 50 KB の永続データがあっても、50 TB の永続データがあっても、Kafka は同じように動作します。

Kafka はストレージを重視し、クライアントが読み取り位置を制御できるようにするため、高パフォーマンスで低レイテンシのコミット ログのストレージ、レプリケーション、および伝播用に設計された特殊な分散ファイル システムと考えることができます。

メッセージング システムとしての Kafka : Kafka のストリームの概念は、従来のエンタープライズ メッセージング システムとどのように異なりますか?

従来、メッセージングにはキューイングとパブリッシュ/サブスクライブの 2 つのモデルがありました。キューでは、一連のコンシューマーがサーバーから読み取り、各レコードがいずれかのコンシューマーに送信されます。

パブリッシュ/サブスクライブ レコード内のすべてのコンシューマーにブロードキャストします。これら 2 つのモデルにはそれぞれ長所と短所があります。

キューイングの利点は、データ処理を複数のコンシューマー インスタンスに分割して、処理能力を拡張できることです。

残念ながら、キューはマルチサブスクライバーではありません。プロセスがデータを読み取ると、そのデータは失われます。パブリッシュ/サブスクライブでは、データを複数のプロセスにブロードキャストできますが、すべてのメッセージがすべてのサブスクライバーに配信されるため、スケーラブルな処理はできません。

Kafka の消費者グループの概念は、これら 2 つの概念を一般化します。キューと同様に、コンシューマ グループを使用すると、一連のプロセス (コンシューマ グループのメンバー) 間で処理を分割できます。パブリッシュ/サブスクライブと同様に、Kafka では複数のコンシューマー グループにメッセージをブロードキャストできます。

Kafka モデルの優れた点は、各トピックがこれらの特性を備えていることです。つまり、処理をスケーリングでき、どちらか一方を選択することなく複数のサブスクライバーに対応できます。

Kafka は、従来のメッセージング システムよりも強力な順序保証も備えています。従来のキューはサーバー上でレコードを順番に保持し、複数のコンシューマーがキューから消費する場合、サーバーはレコードを格納された順序で配布します。

ただし、サーバーはレコードを順番にディスパッチしますが、レコードはコンシューマーに非同期的に配信されるため、異なるコンシューマーに順番どおりに到着しない可能性があります。

これは、並列使用の場合、レコードの順序が失われることを意味します。メッセージング システムは通常、1 つのプロセスのみがキューから消費できるようにする「排他的コンシューマー」の概念を使用してこの問題を解決しますが、これは当然、処理に並列性がないことを意味します。

Kafka はこれをよりうまく実行します。トピック内に並列処理 (つまりパーティション) の概念を持たせることで、Kafka はユーザー プロセスのプール全体にわたって順序の保証と負荷分散を提供できます。

これは、トピック内のパーティションをコンシューマー グループ内のコンシューマーに割り当て、各パーティションがグループ内の 1 人のコンシューマーによって完全に消費されるようにすることで実現されます。

こうすることで、コンシューマーがそのパーティションの唯一のリーダーとなり、データを順番に使用することが保証されます。パーティションが多数あるため、多くのコンシューマー インスタンス間で負荷を分散することも可能です。ただし、コンシューマ グループ内のコンシューマ インスタンスの数はパーティションの数より多くすることはできないことに注意してください。

③Kafka はストリーム処理に使用されます。データストリームを読み書きして保存するだけでは不十分です。目的は、ストリームのリアルタイム処理を実現することです。

Kafka では、ストリーム プロセッサとは、入力トピックから連続的なデータ ストリームを取得し、その入力に対して何らかの処理を実行し、出力トピックに連続的なデータ ストリームを生成するものを指します。

たとえば、小売アプリケーションは、売上と出荷の入力ストリームを受け取り、このデータから計算された再注文と価格調整のストリームを出力できます。

簡単な処理には、プロデューサー API とコンシューマー API を直接使用できます。ただし、より複雑な変換のために、Kafka は完全に統合された Streams API を提供します。

これにより、重要な処理を実行したり、ストリームの集計を計算したり、ストリームを結合したりするアプリケーションを構築できます。

この機能は、順序どおりでないデータの処理、コード変更時の入力の再処理、ステートフルな計算の実行など、このようなアプリケーションが直面する課題に対処するのに役立ちます。

ストリーム API は、Kafka によって提供されるコア プリミティブの上に構築されています。入力にはプロデューサー API とコンシューマー API を使用し、状態の保存には Kafka を使用し、ストリーム プロセッサ インスタンス間のフォールト トレランスには同じグループ メカニズムを使用します。

主要な Kafka 用語の説明

トピック: テーマ。 Kafka では、カテゴリ属性を使用してメッセージを分類します。メッセージを分類するクラスはトピックと呼ばれます。トピックはメッセージの分類ラベルに相当し、論理的な概念です。

異なるトピックからのメッセージは物理的に別々に保存されます。論理的には、トピックからのメッセージは 1 つ以上のブローカーに保存されますが、ユーザーは、データがどこに保存されているかを気にすることなく、メッセージのトピックを指定するだけでデータを生成または使用できます。

パーティション: パーティション。トピック内のメッセージは 1 つ以上のパーティションに分割されます。パーティションは物理的な概念であり、システム上の 1 つ以上のディレクトリに対応します。パーティション内のメッセージは順序付けられていますが、パーティション間のメッセージは順序付けられていません。

セグメント セグメント。パーティションはさらに複数のセグメントに分割され、各セグメント ファイルのサイズは同じです。

ブローカー: Kafka クラスターは 1 つ以上のサーバーで構成され、各サーバー ノードはブローカーと呼ばれます。

ブローカーはトピックデータを保存します。トピックに N 個のパーティションがあり、クラスターに N 個のブローカーがある場合、各ブローカーはトピックの 1 つのパーティションを格納します。

トピックに N 個のパーティションがあり、クラスターに (N+M) 個のブローカーがある場合、N 個のブローカーはトピックの 1 つのパーティションを格納し、残りの M 個のブローカーはトピックのパーティション データを格納しません。

トピックに N 個のパーティションがあり、クラスター内のブローカーの数が N 未満の場合、1 つのブローカーがトピックの 1 つ以上のパーティションを格納します。

実際の運用環境では、Kafka クラスターでデータの不均衡が生じやすくなるため、この状況を回避するようにしてください。

プロデューサー:プロデューサー。プロデューサーはメッセージの発行者であり、選択したトピックにデータを公開します。

プロデューサーは、トピック内のどのパーティションにどのレコードを割り当てるかを選択する責任があります。つまり、プロデューサーによって生成されたメッセージは、特定のパーティションに書き込まれます。

消費者: 消費者。メッセージはブローカーから読み取ることができます。コンシューマーは複数のトピックからのメッセージを消費できます。コンシューマーは同じトピック内の複数のパーティションからのメッセージを消費できます。パーティションにより、複数の消費者が同時に消費できるようになります。

コンシューマー グループ: コンシューマー グループは、Kafka によって提供されるスケーラブルでフォールト トレラントなコンシューマー メカニズムです。

グループには複数のコンシューマーが存在する可能性があり、それらは共通の ID (グループ ID) を共有します。グループ内のすべてのコンシューマーは、サブスクライブされたトピックのすべてのパーティションを消費するように調整します。

Kafka は、同じコンシューマー グループ内の 1 つのコンシューマーだけがメッセージを消費することを保証します。

実際、Kafka は、安定した状態では、各 Consumer インスタンスが 1 つ以上の特定のパーティションのみを消費し、パーティションのデータが特定の Consumer インスタンスによってのみ消費されることを保証します。

以下では、公式 Web サイトの画像を使用して、コンシューマーの数とパーティションの数の対応を示します。

2 つのコンシューマー グループを持つ 4 つのパーティション (P0 ~ P3) を持つ 2 サーバーの Kafka クラスター。コンシューマー グループ A には 2 つのコンシューマー インスタンスがあり、グループ B には 4 つのコンシューマー インスタンスがあります。

私はこれまでこの消費者グループを理解したことがありませんでした。私の結論は、トピックからグループへのパーティションは、パブリッシュ/サブスクライブの通信方法であるということです。

つまり、トピック パーティションのメッセージはすべてのグループによって消費され、1 対多モードに属します。グループとコンシューマー間の通信はポイントツーポイントであり、1対1モードに属します。

たとえば、グループが使用されず、10 個のコンシューマーがトピックの消費を開始すると、これらの 10 個のコンシューマーはトピック内のすべてのデータを取得できます。これは、このトピック内の任意のメッセージが 10 回消費されることに相当します。

グループを使用する場合は、接続時にグループ ID を指定すると、トピックのメッセージは 10 個のコンシューマーに配信され、各メッセージは 1 回だけ消費されます。

パーティションのレプリカ: パーティションのコピー。レプリカは、メッセージの損失を防ぐために作成されたパーティションのバックアップです。

パーティション リーダー: 各パーティションには複数のコピーがありますが、リーダーは 1 つだけです。リーダーは、現在メッセージの読み取りと書き込みを担当しているパーティションです。つまり、すべての読み取りおよび書き込み操作はリーダー パーティションでのみ実行できます。

パーティションフォロワー: すべてのフォロワーはリーダーからのメッセージを同期する必要があり、フォロワーとリーダーは常にメッセージを同期した状態に保ちます。リーダーとフォロワーの関係は、主人と奴隷の関係ではなく、主従関係です。

IR: いいえ

  • ISR (In-Sync Replicas) は、レプリカ同期リストを指します。 ISR リストはリーダーによって管理されます。
  • AR (割り当てられたレプリカ) は、パーティションのすべてのレプリカ、つまり割り当てられたレプリカのリストを指します。
  • OSR (Out-Sync Replicas) は、非同期レプリカのリストです。
  • AR = ISR + OSR

オフセット: オフセット。各メッセージには、現在のパーティションの下に一意の 64 バイトのオフセットがあり、これは現在のパーティションの最初のメッセージのオフセットに相当します。

ブローカー コントローラー: Kafka クラスター内の複数のブローカーの中から 1 つがコントローラーとして選出され、クラスター全体のパーティションとレプリカのステータスを管理する役割を担います。

ブローカー コントローラーのみがウォッチャーを Zookeeper に登録し、他のブローカーとパーティションは登録する必要はありません。つまり、Zookeeper は Broker Controller のステータスの変更を監視するだけで済みます。

HW と LEO:

  • HW (HighWatermark、高水準点) は、コンシューマーが消費できる最高のパーティション オフセットを示します。 HW は Kafka クラスター内のメッセージの一貫性を保証します。正確に言うと、パーティションのフォロワーとリーダー間のデータの一貫性を保証します。
  • LEO、ログ終了オフセット、ログ内の最後のメッセージのオフセット。メッセージは Kafka ログ ファイルに書き込まれます。これは、パーティションに書き込まれた最後のメッセージのオフセットです。
  • コンシューマーは、リーダーによって書き込まれた新しいメッセージをすぐに消費することはできません。リーダーは、HW を更新する前に、ISR 内のすべてのパーティション フォロワーによってメッセージが同期されるのを待機し、その後でのみ、コンシューマーがメッセージを消費できます。

上記の概念を読んだ後でもまだ混乱していると思います。わかりました! 2つの関係を図で説明してみましょう。

Zookeeper : Zookeeper は、ブローカーの維持と調整、およびブローカー コントローラーの選出を担当します。 Kafka 0.9 より前のバージョンでは、Offset は ZK によって管理されていました。

概要: ZK はコントローラーの選出を担当し、コントローラーはリーダーの選出を担当します。

コーディネーター: 一般的には、各ブローカーで実行されるグループ コーディネーター プロセスを指します。これは、主にオフセット変位管理とリバランスのために、コンシューマー グループのメンバーを管理するために使用されます。コーディネーターは複数のコンシューマー グループを同時に管理できます。

再バランス: グループ内のコンシューマーの数が変更されるか、トピック内のパーティションの数が変更されると、パーティションの所有権がコンシューマー間で転送され、つまりパーティションが再分配されます。このプロセスはリバランスと呼ばれます。

再バランス調整により、コンシューマー グループとブローカーに高パフォーマンス、高可用性、およびスケーラビリティがもたらされますが、再バランス調整中はコンシューマーはメッセージを読み取ることができないため、ブローカー クラスター全体が短時間使用できなくなります。したがって、不必要なリバランスは避けるべきです。

オフセットコミット: コンシューマーはブローカーからメッセージのバッチを受け取り、それを消費のためにバッファーに書き込みます。指定された時間内にメッセージを消費した後、消費されたメッセージのオフセットがブローカーに自動的に送信され、どのメッセージが消費されたかが記録されます。もちろん、期限内に消費が完了しない場合は、オフセットは提出されません。

Kafkaの動作原理とプロセス

①メッセージ書き込みアルゴリズム

メッセージ送信者はブローカーにメッセージを送信し、コンシューマーが使用できる最終的なログを形成します。これは比較的複雑なプロセスです。

  • プロデューサーはまず、Zookeeper からパーティションのリーダーを見つけます。
  • プロデューサーはリーダーにメッセージを送信します。
  • リーダーはローカル ログへのメッセージにアクセスし、ISR のフォロワーに通知します。
  • ISR のフォロワーはリーダーからメッセージをプルし、それをローカル ログに書き込み、リーダーに Ack を送信します。
  • リーダーは、ISR 内のすべてのフォロワーから Ack を受信すると、HW を増やしてプロデューサーに Ack を送信し、メッセージが正常に書き込まれたことを示します。

②メッセージルーティング戦略

API を通じてメッセージを公開する場合、プロデューサーはメッセージをレコードとして公開します。

レコードにはキーと値が含まれます。値は実際のメッセージそのものであり、キーはメッセージを保存するパーティションをルーティングするために使用されます。

メッセージが書き込まれるパーティションはランダムではなく、ルーティング戦略があります。

  • パーティションが指定されている場合、データは指定されたパーティションに直接書き込まれます。
  • パーティションが指定されておらず、キーが指定されている場合、モジュラスはキーのハッシュ値とパーティションの数を取得して計算されます。
  • 結果は選択されるパーティション インデックスです。
  • パーティションもキーも指定されていない場合は、ラウンドロビン アルゴリズムを使用してパーティションが選択されます。

③HW切り捨て機構

パーティション リーダーが新しいメッセージを受信し、ISR 内の他のフォロワーが同期中の場合、同期が完了する前にリーダーがクラッシュします。

この時点で、新しいリーダーを選出する必要があります。 HW 切り捨てメカニズムがない場合、パーティション内のリーダー データとフォロワー データは不整合になります。

元のリーダーがクラッシュして回復すると、その LEO はクラッシュ時の HW にロールバックされ、データは新しいリーダーと同期されます。これにより、古いリーダーのデータと新しいリーダーのデータが一致することが保証されます。このメカニズムは、HW 切り捨てメカニズムと呼ばれます。

④メッセージ送信の信頼性

プロデューサーが Kafka にメッセージを送信するときに、必要な信頼性のレベルを選択できます。 request.required.acks パラメータの値を介して設定します。

値 0: 非同期で送信します。プロデューサーは Kafka にメッセージを送信しますが、Kafka は成功の Ack で応答しません。この方法は最も効率的ですが、信頼性は最も低くなります。

メッセージが失われる状況が発生する可能性があります:

  • 送信中にメッセージが失われる可能性があります。
  • ブローカー内でメッセージが失われる可能性があります。
  • Kafka に書き込まれるメッセージの順序が生成順序と一致しない状況が発生する可能性があります。

値 1: 同期的に送信します。プロデューサーは Kafka にメッセージを送信し、ブローカーのパーティション リーダーはメッセージを受信後すぐに成功 Ack を送信します (ISR でのフォロワーの同期を待たずに)。

メッセージを受信したプロデューサーは、メッセージが正常に送信されたことを認識し、メッセージを再度送信します。プロデューサーが Kafka から Ack を受信して​​いない場合、メッセージの送信に失敗したとみなし、メッセージを再送信します。

プロデューサーにとっては、Ack を受信しなかった場合、メッセージの送信に失敗したことが確認でき、再送信することができます。

ただし、ACK を受信して​​も、メッセージが正常に送信されたという保証はありません。したがって、この場合、メッセージの損失も発生する可能性があります。

-1 値: 同期的に送信します。プロデューサーはカフカにメッセージを送信します。メッセージを受信した後、Kafka は ISR リスト内のすべてのレプリカがメッセージの同期を完了するまで待機し、その後プロデューサーに成功 Ack を送信します。

Kafka からの Ack が受信されない場合、メッセージは送信に失敗したとみなされ、自動的に再送信されます。この方法では、メッセージが重複して受信される可能性があります。

⑤消費者の消費プロセスの分析

プロデューサーはトピックにメッセージを送信し、コンシューマーはそれを消費できます。消費プロセスは次のとおりです。

  • コンシューマーはブローカーに接続要求を送信し、接続されたブローカーは構成ファイル内のリスナー アドレスであるブローカー コントローラーの通信 URL を送信します。
  • コンシューマーが消費するトピックを指定すると、ブローカー コントローラーに消費要求を送信します。
  • ブローカー コントローラは、1 つ以上のパーティション リーダーをコンシューマーに割り当て、パーティションの現在のオフセットをコンシューマーに送信します。
  • コンシューマーは、ブローカー コントローラーによって割り当てられたパーティション内のメッセージを消費します。
  • コンシューマーがメッセージの消費を終了すると、メッセージが消費されたことを示すメッセージ (メッセージのオフセット) をブローカーに送信します。
  • ブローカーはコンシューマーからオフセットを受け取った後、対応する __consumer_offset を更新します。
  • 上記のプロセスは、消費者が消費の要求を停止するまで繰り返されます。
  • コンシューマーはオフセットをリセットできるため、ブローカーに保存されているメッセージを柔軟に消費できます。

⑥パーティションリーダー選挙の範囲

リーダーがダウンすると、ブローカー コントローラーは ISR からフォロワーを選択して新しいリーダーになります。

ISR に他のレプリカがない場合はどうなりますか?リーダー選出の範囲は、unclean.leader.election.enable の値によって設定できます。

False : ISR リスト内のすべてのレプリカがアクティブになった後にのみ、新しい選択を実行する必要があります。この戦略は信頼性を保証しますが、可用性は低くなります。

True : ISR リストにレプリカがない場合、ダウンしていない任意のホストを新しいリーダーとして選択できます。この戦略は可用性が高くなりますが、信頼性は保証されません。

⑦繰り返し消費の問題への解決策

同じコンシューマーによる繰り返し消費: 消費容量が低いためにコンシューマーの消費タイムアウトが発生すると、繰り返し消費が発生する可能性があります。

データが消費されたばかりで、オフセットを送信しようとしている場合、消費時間がタイムアウトになり、ブローカーはメッセージが正常に消費されなかったと見なします。これにより、重複消費の問題が発生します。解決策: オフセットの送信時間を延長します。

異なるコンシューマーによる重複した消費: コンシューマーがメッセージを消費したが、オフセットを送信する前にクラッシュした場合、消費されたメッセージは繰り返し消費されます。解決策: 自動送信を手動送信に変更します。

⑧カフカの繰り返し消費問題を建築設計の観点から解決する

たとえば、プログラムを設計するときに、ネットワーク障害などの異常な状況を考慮して、メッセージの再試行回数を設定します。メッセージが重複する他の可能性もあります。どうやって解決すればいいのでしょうか?解決策は3つあります。

解決策1: 保存してクエリを実行する

各メッセージに一意の uuid を設定します。すべてのメッセージの uuid を保存する必要があります。

メッセージを消費するときは、まず永続性システムをチェックして、そのメッセージが以前に消費されたことがあるかどうかを確認します。そうでなければ、再度消費します。消費された場合には廃棄いたします。

次の図は、このアプローチを示しています。

解決策2: べき等性を利用する

べき等性は数学的に次のように定義されます: 関数 f(x) が f(f(x)) = f(x) を満たす場合、関数 f(x) はべき等性を満たします。

この概念はコンピュータ分野にまで拡張され、操作、方法、またはサービスを説明するために使用されます。べき等操作とは、複数回実行した場合の効果が 1 回実行した場合と同じである操作です。

べき等メソッドは、同じパラメータで複数回呼び出された場合も、1 回呼び出された場合も、システムに同じ影響を与えます。したがって、べき等メソッドの場合、繰り返し実行してもシステムに変更が生じることを心配する必要はありません。

これを説明するために例を挙げてみましょう。同時実行性を考慮せずに、「教師Xの口座残高を100万元に設定する」を1回実行すると、システムへの影響は教師Xの口座残高が100万元になることです。

100 万元という提供パラメータが変更されない限り、何度実行しても、X 氏の口座残高は常に 100 万元のまま変わりません。この操作はべき等操作です。

別の例を見てみましょう。「X先生の残高に100万元を追加する」この操作はべき等ではありません。実行されるたびに、口座残高は100万元ずつ増加します。複数回実行した場合と 1 回実行した場合では、システムへの影響 (つまり、アカウント残高) が異なります。

したがって、これら 2 つの例から、メッセージを消費するシステムのビジネス ロジックがべき等である場合、同じメッセージが 1 回消費されても複数回消費されても、システムに与える影響はまったく同じであるため、メッセージの重複の問題を心配する必要はないことがわかります。つまり、複数回摂取することは、1回摂取することと同等であると考えられます。

では、べき等操作をどのように実装するのでしょうか?最善の方法は、ビジネス ロジックの設計から始めて、コンシューマー ビジネス ロジックをべき等な操作に設計することです。

ただし、すべてのビジネスが自然にべき等性を持つように設計できるわけではないため、べき等性を実現するにはいくつかの方法とテクニックが必要です。

以下では、データベースの一意制約を使用して冪等性を実現する一般的な方法を紹介します。

たとえば、先ほど述べた振替の例には冪等性がありません。つまり、教師 X の口座残高に 100 万元を追加するというものです。この例では、ビジネス ロジックを変更してべき等性を持たせることができます。

まず、各振替注文に対して、各アカウントが実行できる変更操作を 1 つだけに制限できます。分散システムでは、この制限を実装する方法は多数あります。最も簡単な方法は、データベースに転送フロー テーブルを作成することです。

このテーブルには、振替注文 ID、アカウント ID、変更金額の 3 つのフィールドがあります。次に、振替注文 ID とアカウント ID の 2 つのフィールドを組み合わせて、一意の制約が作成されます。この方法では、同じ振替注文 ID とアカウント ID に対してテーブル内に最大 1 つのレコードが存在することになります。

このようにして、メッセージ消費のロジックを次のように変更できます。「転送フロー テーブルに転送レコードを追加し、転送レコードに基づいてユーザー残高を非同期的に更新します。」

振替フローテーブルに振替レコードを追加する操作では、このテーブルに「口座ID振替オーダーID」の一意制約を事前に定義しているため、同じ振替オーダーと同じ口座に対して挿入できるレコードは 1 つだけであり、その後の繰り返し挿入操作は失敗し、冪等操作が実現されます。

解決策3: 前提条件の設定

更新されたデータの前提条件を設定する べき等性を実現するもう 1 つの方法は、データの変更の前提条件を設定することです。条件が満たされると、データが更新されます。そうでない場合、データの更新は拒否されます。データを更新すると、前提条件で判断する必要があるデータも変更されます。

このように、この操作を繰り返すと、最初にデータを更新したときに前提条件で判断する必要があるデータが変更されているため、前提条件が満たされない場合はデータ更新操作が繰り返されません。

たとえば、「X先生の口座残高を100万元増やす」という操作は、べき等性を満たさないと述べました。この操作に前提条件を追加して、「教師 X のアカウントの現在の残高が 500 万元の場合、残高を 100 万元増やす」と変更すると、この操作は冪等性を持ちます。

メッセージ キューで使用すると、メッセージを送信するときに、現在の残高をメッセージ本文に含めることができます。消費する場合、データベース内の現在のバランスがメッセージのバランスに等しいかどうかを判断できます。それらが等しい場合にのみ、変更操作が実行されます。

ただし、更新するデータが数値ではない場合、またはより複雑な更新操作を実行する場合はどうなりますか?前提条件として何を使用すればよいですか?

より一般的な方法は、データにバージョン番号属性を追加することです。毎回データを更新する前に、現在のデータのバージョン番号をメッセージのバージョン番号と比較します。それらが一貫していない場合は、データの更新を拒否します。データを更新するときは、バージョン番号に1を追加します。これにより、iDempotenceも達成できます。

Kafkaクラスター構造

私たちの仕事では、環境の高度な可用性を確保し、単一のポイントを防ぐために、カフカはクラスターの形で現れます。今、私はあなたをKafkaクラスター環境の構築に導きます。

公式ウェブサイトからKafkaをダウンロードします。ダウンロードアドレスはhttp://kafka.apache.org/downloadsです。必要なバージョンをダウンロードします。安定したバージョンを使用することをお勧めします。

クラスターの構築

downDownLoadとUnzip

  1. CD /USR / LOCAL /SRC
  2. wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
  3. MKDIR /データ /サーバー
  4. TAR XZVF KAFKA_2.11-2.4.0.TGZ -C/DATA/SERVERS/
  5. cd /data/servers/kafka_2.11-2.4.0

configuration構成ファイルを修正します

Kafkaの構成ファイル$ kafka_home/config/server.propertiesは、主に次の項目を変更します。

  1. 各マシンでIDが異なることを確認してください
  2. ブローカーID=0
  3. サーバーの監視アドレスを構成します
  4. リスナー= plantext://192.168.51.128:9092
  5. Kafkaログディレクトリ
  6. log.dirs =/data/servers/kafka_2.11-2.4.0/logs
  7. #カフカが設定した部品の数
  8. パーティション数=1
  9.  
  10. Zookeeperの接続アドレス。独自のZookeeperクラスターをお持ちの場合は、直接構築したZookeeperクラスターを使用してください。
  11. Zookeeper.Connect = 192.168.51.128:2181

私は自分のローカルマシンで実験を行っているので、私が使用するのは同じホストの異なるポートだけです。オンラインでは、それらは異なるマシンです。あなたはそれらを参照することができます。

ここではKafkaのZookeeperを使用し、1つのノードのみを開始します。ただし、実際の生産プロセスでは、Zookeeperクラスターが必要です。自分で構築できます。また、Zookeeperチュートリアルを後でリリースしますので、注意してください。

Copy 3構成ファイル

  1. #対応するログディレクトリを作成します
  2. mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
  3. mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
  4. mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094
  5. #copy 3つの構成ファイル
  6. cp server.properties server_9092.properties
  7. cp server.properties server_9093.properties
  8. cp server.properties server_9094.properties

riffereさまざまなポートに対応するファイルを修正します

  1. #9092のIDは0、9093のIDは1、9094のIDは2です
  2. ブローカーID=0
  3. #サーバーの監視アドレスを構成し、さまざまな構成ファイルに異なるポートを書き込む
  4. リスナー= plantext://192.168.51.128:9092
  5. #kafkaログディレクトリ、ディレクトリは異なるポートにも対応しています
  6. log.dirs =/data/servers/kafka_2.11-2.4.0/logs/9092
  7. #カフカが設定した部品の数
  8. パーティション数=1
  9. #Zookeeper接続アドレス。独自のZookeeperクラスターをお持ちの場合は、直接構築したZookeeperクラスターを使用してください。
  10. Zookeeper.Connect = 192.168.51.128:2181

Zookeeper構成ファイルを変更します。

  1. datadir =/data/servers/zookeeper
  2. Server.1 = 192.168.51.128:2888:3888

次に、ZookeeperのMyIDファイルを作成します。

  1. echo "1" >/data/servers/zookeeper/myid

Start Zookeeper

Kafkaの組み込みZookeeperを使用してください。

  1. cd /data/servers/kafka_2.11-2.4.0/bin
  2. Zookeeper-Server-start.sh -daemon ../config/zookeeper.properties
  3. Netstat -Anp | Grep 2181

Kafka を起動します。

  1. ./kafka-server-start.sh -daemon ../config/server_9092.properties
  2. ./kafka-server-start.sh -daemon ../config/server_9093.Properties
  3. ./kafka-server-start.sh -daemon ../config/server_9094.properties

Kafka Operations

トピック

まず、トピックを作成するために一般的に使用されるパラメーターを見てみましょう。

  • -Create:トピックを作成します
  • - delete:トピックを削除します
  • -alter:トピックのパーティションの名前または数を変更する
  • - リスト:トピックを表示します
  • - 説明:トピックの詳細をご覧ください
  • - トピック
  • - Zookeeper

例:

  1. cd /data/servers/kafka_2.11-2.4.0/bin
  2. #トピックテストを作成します1
  3. Kafka- Topics.sh-Create -Bootstrap-Server = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 - Replication-Factor 1-Partitions 1--Topicic1
  4. #トピックテスト2を作成します2
  5. kafka-topics.sh -create - bootstrap-server = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 - 9094 - 再分析ファクター1-パリショナ
  6. #トピックを表示します
  7. kafka-topics.sh - list -bootstrap-server = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

comation自動的にトピックを作成します

職場では、トピックを管理したくない場合は、Kafkaの構成ファイルを介してそれらを管理できます。

Kafkaがトピックを自動的に作成できるようにすることができ、次の構成ファイルをKafka構成ファイルに追加する必要があります。

  1. 自動。 .topics.enable = trueを作成します

物理的削除の目的を達成したい場合は、それを構成する必要があります。

  1. delete .topic.enable = true

メッセージを送信します

クライアントのコマンドを介してメッセージを作成できます。まず、Kafka-Console-Producer.shの一般的に使用されているいくつかのパラメーターを見てみましょう。

  • -TOPIC <文字列:トピック>:トピックを指定します
  • -timeout <integer:timeout_ms>:タイムアウト時間
  • - シンク:メッセージを非同期に送信します
  • -broker-list <string:broker-list>:公式ウェブサイトプロンプト:必須:フォームのブローカーリストstring host1:port1、host2:port2。

このパラメーターが必要です。

  1. Kafka-Console -Producer.sh-Broker-List 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 - トピックテスト1

④消費ニュース

Kafka-Console-Consumer.shのパラメーターを最初に見てみましょう。

  • -TOPIC <文字列:トピック>:トピックを指定します
  • --Group <String:Consumer Group ID>:消費者グループを指定します
  • - ビーニングから:消費は最初から行われることを指定します。
  • -bootstrap-server:Kafkaの接続アドレス
  1. kafka-console -consumer.sh-bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 - トピックテスト1---開始

カフカのログ

Kafkaのログは2つのタイプに分かれています。

  • ログの最初のタイプは、Kafkaスタートアップログです。これは、問題をトラブルシューティングしてエラー情報を表示するログです。
  • 2番目のタイプのログはデータログです。 Kafkaは、データがログの形で保存されているということです。 2番目のタイプのログは、パーティトンとセグメントです。

次に、バックアップとパーティションについて説明しましょう。パーティションとバックアップを作成する場合、テストは3つのマシンまたは3つのデータディレクトリに1つのテスト-0のみを使用する必要があります。 (パーティションの添え字は0から始まります)

nパーティションを作成すると、3つのサーバー、test_0-nで見つかります。Mバックアップを作成すると、test_0からtest_nのそれぞれがMであることがわかります。

Kafka API

KafkaネイティブAPIを使用します

消費者への自動提出

あなた自身のプロデューサーを定義します:

  1. org.apache.kafka.clients.producer.callbackをインポートします。
  2. org.apache.kafka.clients.producer.KafkaProducer をインポートします。
  3. org.apache.kafka.clients.producer.producerrecordをインポートします。
  4. org.apache.kafka.clients.producer.recordmetadataをインポートします。
  5. java.util.Properties をインポートします。
  6. /**
  7. * @classname mykafkaproducer
  8. * @description todo
  9. * @author lingxiangxiang
  10. * @日付午後3時37分
  11. * @バージョン 1.0
  12. **/
  13. パブリッククラス MyKafkaProducer {
  14. private org.apache.kafka.clients.producer.kafkaproducer < integer 、string>プロデューサー;
  15. public mykafkaproducer(){
  16. プロパティ properties = new Properties();
  17. Properties.put( "Bootstrap.Servers""192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094" );
  18. properties.put( "key.serializer""org.apache.kafka.common.serialization.integerserializer" );
  19. Properties.put( "value.serializer""org.apache.kafka.common.serialization.stringserializer" );
  20. //バッチ送信を設定します
  21. Properties.put( "batch.size" 、16384);
  22. //バッチ送信の待機時間は50msです。50msを超える場合、バッチサイズが不十分であっても送信されます。
  23. Properties.put( "linger.ms" 、50);
  24. this.producer = new org.apache.kafka.clients.producer.kafkaproducer < integer 、string>(properties);
  25. }
  26. public boolean sendmsg(){
  27. boolean result = true ;
  28. 試す {
  29. //通常の送信、test2はトピック、0はパーティションを表し、1はキーを表し、HelloWorldはメッセージコンテンツを送信します
  30. final produceRecord < integer 、string> record = new produceRecord < integer 、string>( "test2" 、0、1、 "Hello World" );
  31. プロデューサー.send(レコード);
  32. //コールバック関数で呼び出します
  33. プロデューサー.send(レコード、新しいコールバック() {
  34. @オーバーライド
  35. パブリックvoid onCompletion(RecordMetadata recordMetadata, 例外e) {
  36. システム。 out .println(recordmetadata.topic());
  37. システム。 out .println(Recordmetadata.Partition());
  38. システム。 out .println(recordmetadata.offset());
  39. }
  40. });
  41. //クラスを自分で定義します
  42. producer.send(レコード、新しいmycallback(レコード));
  43. } キャッチ (例外 e) {
  44. result = false ;
  45. }
  46. 結果を返します
  47. }
  48. }

プロデューサーが正常に送信するコールバック関数を定義します。

  1. org.apache.kafka.clients.producer.callbackをインポートします。
  2. org.apache.kafka.clients.producer.recordmetadataをインポートします。
  3. /**
  4. * @classname mycallback
  5. * @description todo
  6. * @author lingxiangxiang
  7. * @日付午後3時51分
  8. * @バージョン 1.0
  9. **/
  10. パブリッククラスのmycallbackはコールバックを実装しています{
  11. プライベートオブジェクトMSG;
  12. public mycallback(オブジェクトmsg){
  13. this.msg = msg;
  14. }
  15. @オーバーライド
  16. public void incompletion(Recordmetadata metadata、Exception E){
  17. システム。 out .println( "topic =" + metadata.topic());
  18. システム。 out .println( "partiton =" + metadata.partition());
  19. システム。 out .println( "offset =" + metadata.offset());
  20. システム。 out .println(msg);
  21. }
  22. }

プロデューサーのテストクラス:プロデューサーテストクラスでは、落とし穴に遭遇しました。つまり、最終的には睡眠を追加しませんでした。自分のコードを確認する方法に問題はありませんでしたが、最終的には成功メッセージを送信できず、睡眠を追加しました。

メイン関数のメインが実行され、終了されたため、メッセージは送信されていないため、待つ必要があります。もちろん、生産環境でこのような問題に遭遇しないかもしれません。

コードは次のとおりです。

  1. static java.lang.thread.sleepをインポートします。
  2. /**
  3. * @classname mykafkaproducertest
  4. * @description todo
  5. * @author lingxiangxiang
  6. * @日付午後3時46分
  7. * @バージョン 1.0
  8. **/
  9. パブリッククラスmykafkaproducertest {
  10. public static void main(string [] args)throws arturnedexception {
  11. mykafkaproducer producer = new mykafkaproducer();
  12. boolean result = producer.sendmsg();
  13. システム。 out .println( "msg" + resultを送信);
  14. スリープ(1000);
  15. }
  16. }

消費者カテゴリ:

  1. kafka.utils.shutdownablethreadをインポートします。
  2. org.apache.kafka.clients.consumer.consumerrecordをインポートします。
  3. org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
  4. org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
  5. java.util.Arrays をインポートします。
  6. java.util.Collections をインポートします。
  7. java.util.Properties をインポートします。
  8. /**
  9. * @classname mykafkaconsumer
  10. * @description todo
  11. * @author lingxiangxiang
  12. * @日付午後4時12分
  13. * @バージョン 1.0
  14. **/
  15. パブリッククラスmykafkacosumerはshutdownablethreadを拡張します{
  16. private kafkaconsumer < integer 、string> consumer;
  17. public mykafkaconsumer(){
  18. super( "kafkaconsumertest"false );
  19. プロパティ properties = new Properties();
  20. Properties.put( "Bootstrap.Servers""192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094" );
  21. properties.put( "group.id""mygroup" );
  22. properties.put( "enable.auto.commit""true" );
  23. properties.put( "auto.commit.interval.ms""1000" );
  24. Properties.put( "Session.Timeout.ms""30000" );
  25. properties.put( "heartbeat.interval.ms""10000" );
  26. properties.put( "auto.offset.reset""hireliest" );
  27. Properties.put( "key.deserializer""org.apache.kafka.common.serialization.integerdeserializer" );
  28. Properties.put( "value.deserializer""org.apache.kafka.common.serialization.stringdeserializer" );
  29. this.consumer = new Kafkaconsumer < integer 、string>(Properties);
  30. }
  31. @オーバーライド
  32. public void dowork(){
  33. consumer.subscribe(arrays.aslist( "test2" ));
  34. 消費者< integer 、string> records = consumer.poll(1000);
  35. for (ConsumerRecord Record:Records){
  36. システム。 out .println( "topic =" + record.topic());
  37. システム。 out .println( "partition =" + record.partition());
  38. システム。 out .println( "key =" +record。key( );
  39. システム。 out .println( "value =" + record.value());
  40. }
  41. }
  42. }

消費者テストクラス:

  1. /**
  2. * @classname myconsumertest
  3. * @description todo
  4. * @author lingxiangxiang
  5. * @日付午後4時23分
  6. * @バージョン 1.0
  7. **/
  8. パブリッククラスmyconsumertest {
  9. パブリック静的voidメイン(String[] args) {
  10. mykafkaconsumer Consumer = new mykafkaconsumer();
  11. Consumer.start();
  12. システム。 out .println( "=============================================================
  13. }
  14. }

②同定手動で送信します

以前の消費者は、オフセットを自動的に送信することにより、ブローカーでメッセージを消費しますが、自動提出はメッセージの繰り返しの消費を引き起こす可能性があります。

したがって、生産環境では、繰り返し消費の問題を解決するためにオフセットを手動で提出する必要があることがよくあります。

手動の提出は、同期の提出、非同期提出、および同期のジョイントの提出に分けることができます。これらの提出方法は、Dowork()メソッドが異なり、そのコンストラクターが同じであるというだけです。

したがって、第一に、コンストラクターは以前の消費者クラスに基づいて変更され、次に3つの異なる提出方法が個別に実装されます。

同期提出方法は、消費者がブローカーにオフセットを提出し、ブローカーが正常に応答するのを待つことです。応答がない場合、応答が得られるまで再提出されます。

そして、この待機プロセスでは、消費者はブロックされています。消費者のスループットに深刻な影響を与えます。

以前のmykafkaconsumer.javaを変更し、主に次の構成を変更します。

  1. kafka.utils.shutdownablethreadをインポートします。
  2. org.apache.kafka.clients.consumer.consumerrecordをインポートします。
  3. org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
  4. org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
  5. java.util.Arrays をインポートします。
  6. java.util.Collections をインポートします。
  7. java.util.Properties をインポートします。
  8. /**
  9. * @classname mykafkaconsumer
  10. * @description todo
  11. * @author lingxiangxiang
  12. * @日付午後4時12分
  13. * @バージョン 1.0
  14. **/
  15. パブリッククラスmykafkacosumerはshutdownablethreadを拡張します{
  16. private kafkaconsumer < integer 、string> consumer;
  17. public mykafkaconsumer(){
  18. super( "kafkaconsumertest"false );
  19. プロパティ properties = new Properties();
  20. Properties.put( "Bootstrap.Servers""192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094" );
  21. properties.put( "group.id""mygroup" );
  22. //ここでは、手動で送信するために変更する必要があります
  23. properties.put( "enable.auto.commit""false" );
  24. // properties.put( "auto.commit.interval.ms""1000" );
  25. Properties.put( "Session.Timeout.ms""30000" );
  26. properties.put( "heartbeat.interval.ms""10000" );
  27. properties.put( "auto.offset.reset""hireliest" );
  28. Properties.put( "key.deserializer""org.apache.kafka.common.serialization.integerdeserializer" );
  29. Properties.put( "value.deserializer""org.apache.kafka.common.serialization.stringdeserializer" );
  30. this.consumer = new Kafkaconsumer < integer 、string>(Properties);
  31. }
  32. @オーバーライド
  33. public void dowork(){
  34. consumer.subscribe(arrays.aslist( "test2" ));
  35. 消費者< integer 、string> records = consumer.poll(1000);
  36. for (ConsumerRecord Record:Records){
  37. システム。 out .println( "topic =" + record.topic());
  38. システム。 out .println( "partition =" + record.partition());
  39. システム。 out .println( "key =" +record。key( );
  40. システム。 out .println( "value =" + record.value());
  41. //手動で同期送信
  42. コンシューマー.commitSync();
  43. }
  44. }
  45. }

consunchers消費者によって手作業で提出されています

手動の同期送信方法では、ブローカーがうまく応答するのを待つ必要があります。

非同期提出方法は、ブローカーにオフセットを提出した後、消費者が成功した応答を待つ必要がないため、消費者のスループットが増加することです。

  1. kafka.utils.shutdownablethreadをインポートします。
  2. org.apache.kafka.clients.consumer.consumerrecordをインポートします。
  3. org.apache.kafka.clients.consumer.ConsumerRecords をインポートします。
  4. org.apache.kafka.clients.consumer.KafkaConsumer をインポートします。
  5. java.util.Arrays をインポートします。
  6. java.util.Collections をインポートします。
  7. java.util.Properties をインポートします。
  8. /**
  9. * @classname mykafkaconsumer
  10. * @description todo
  11. * @author lingxiangxiang
  12. * @日付午後4時12分
  13. * @バージョン 1.0
  14. **/
  15. パブリッククラスmykafkacosumerはshutdownablethreadを拡張します{
  16. private kafkaconsumer < integer 、string> consumer;
  17. public mykafkaconsumer(){
  18. super( "kafkaconsumertest"false );
  19. プロパティ properties = new Properties();
  20. Properties.put( "Bootstrap.Servers""192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094" );
  21. properties.put( "group.id""mygroup" );
  22. //ここでは、手動で送信するために変更する必要があります
  23. properties.put( "enable.auto.commit""false" );
  24. // properties.put( "auto.commit.interval.ms""1000" );
  25. Properties.put( "Session.Timeout.ms""30000" );
  26. properties.put( "heartbeat.interval.ms""10000" );
  27. properties.put( "auto.offset.reset""hireliest" );
  28. Properties.put( "key.deserializer""org.apache.kafka.common.serialization.integerdeserializer" );
  29. Properties.put( "value.deserializer""org.apache.kafka.common.serialization.stringdeserializer" );
  30. this.consumer = new Kafkaconsumer < integer 、string>(Properties);
  31. }
  32. @オーバーライド
  33. public void dowork(){
  34. consumer.subscribe(arrays.aslist( "test2" ));
  35. 消費者< integer 、string> records = consumer.poll(1000);
  36. for (ConsumerRecord Record:Records){
  37. システム。 out .println( "topic =" + record.topic());
  38. システム。 out .println( "partition =" + record.partition());
  39. システム。 out .println( "key =" +record。key( );
  40. システム。 out .println( "value =" + record.value());
  41. //手動で同期送信
  42. // consumer.commitsync();
  43. //マニュアル非同期提出
  44. // consumer.commitasync();
  45. //コールバックパブリックとのマニュアルの非同期提出
  46. Consumer.commitasync((offsets、e) - > {
  47. if(e!= null ){
  48. システム。 out .println( "提出数、offsets =" + offsets);
  49. システム。 out .println( "Exception =" + e);
  50. }
  51. });
  52. }
  53. }
  54. }

Kafkaを使用したスプリングブート

現在、開発プロセス中に、それらの多くはスプリングブートプロジェクトを使用して直接起動します。ネイティブAPIを使用している場合は、少し低くなります。では、KafkaはSpring Bootとどのように協力していますか?

Maven構成:

  1. <! -https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients->
  2. <依存関係>
  3. <グループ ID>org.apache.kafka</グループ ID>
  4. <artifactid> kafka-clients </artifactid>
  5. <バージョン> 2.1.1 </version>
  6. </依存関係>

構成ファイルを追加し、次の構成情報をApplication.Propertiesに追加します。

Kafka接続アドレス:

  1. spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094

プロデューサー:

  1. spring.kafka.producer.acks = 0
  2. spring.kafka.producer。 key -serializer = org.apache.kafka.common.serialization.stringserializer
  3. spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.stringserializer
  4. spring.kafka.producer.retries = 3
  5. spring.kafka.producer.batch- size = 4096
  6. spring.kafka.producer.buffer-memory = 33554432
  7. spring.kafka.producer.compression-type = gzip

消費者:

  1. spring.kafka.consumer。グループ-id = mygroup
  2. spring.kafka.consumer.auto-コミット-interval = 5000
  3. spring.kafka.consumer.heartbeat-interval = 3000
  4. spring.kafka.consumer。 key -deserializer = org.apache.kafka.common.serialization.stringdeserializer
  5. spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.stringdeserializer
  6. spring.kafka.consumer.auto-offset-reset =初期
  7. spring.kafka.consumer.enable-auto- commit = true
  8. #リスナー、消費者のモニターの数を特定します
  9. spring.kafka.listener.concurrency = 8
  10. #トピック名
  11. kafka.topic1 = topic1

プロデューサー:

  1. lombok.extern.slf4j.Slf4j をインポートします。
  2. Import org.springframework.beans.factory.annotation.value;
  3. org.springframework.kafka.core.kafkatemplateをインポートします。
  4. @サービス
  5. 翻訳者
  6. パブリッククラスmykafkaproducerserviceimpl mykafkaproducerservice {
  7. @リソース
  8. private kafkatemplate <string、string> kafkatemplate;
  9. //構成ファイルを読み取ります
  10. @value( "$ {kafka.topic1}"
  11. プライベート文字列トピック。
  12. @オーバーライド
  13. public void sendkafka(){
  14. kafkatemplate.send(トピック、 "Hell World" );
  15. }
  16. }

消費者:

  1. @成分
  2. 翻訳者
  3. パブリッククラスMyKafkaConsumer {
  4. @kafkalistener(topics = "$ {kafka.topic1}"
  5. public void listen(consumerrecord <?、?> record){
  6. optional <?
  7. if(kafkamessage.ispresent()){
  8. log.info( "----------------------レコード=" +レコード);
  9. log.info( "------------------ message =" + kafkaMessage.get());
  10. }

[51CTO オリジナル記事、パートナーサイトに転載する場合は、元の著者とソースを 51CTO.com として明記してください]

<<:  2020年第1四半期のクラウドサプライチェーン収益レポートのレビュー

>>:  Redisの作者と分散マスターが戦い、最大の勝者はメロンを食べるネットユーザーだった

推薦する

留学ウェブサイト編集経験

私はこの業界に4年以上いるので、あまりプロフェッショナルとは言えません。しかし、教育ポータルの留学チ...

PyramidServer - ドイツ製 KVM が 50% オフ、限定オファー

Pyramid Server は 2007 年に設立され、2010 年に正式に会社として運営を開始し...

ウェブマスターの注目を集めるがランキングには関係しない3つの要素

すべての SEO 担当者は、ウェブサイトのランキングという共通の目標を持っていると私は信じています。...

Hongmeng HarmonyOS 分散ソフトバス: 低遅延、高帯域幅のマルチデバイス仮想ネットワークの構築

[[352387]]詳細については、以下をご覧ください。 51CTOとHuaweiが共同で構築したH...

福州警察はフィッシングサイトのソースコードを作成して販売していたグループを壊滅させた

記者が昨日、市公安局から得た情報によると、警察は6か月以上の綿密な捜査を経て、1日にフィッシングサイ...

ビリビリと小紅書の「リブ」、虎埔?

ユーザー数の増加というプレッシャーがますます顕著になる中、若者を主なユーザーとするビリビリと、女性を...

毎日の話題:DianpingのEle.meへの投資は、食品配達O2O市場の獲得を狙っているのか?

A5ウェブマスターネットワーク(www.admin5.com)は5月6日、食品配達ウェブサイトEle...

クラウドコンピューティングが普及した時代において、企業はどのようにして優れたクラウドサーバーを選択できるのでしょうか?

インターネット時代において、デジタル変革は新たな要件と新たなトレンドとなっています。企業は、変革を効...

草の根SEO担当者はBaiduの国際化戦略についてどう考えているのでしょうか?

昨日、JD.comが3.cnドメイン名を取得したとお伝えしました。今朝起きてニュースをチェックしたと...

sharktech - 60% オフ プロモーション (米国で最も強力な D サーバー)

4月に友人がsharktechからサーバーを購入するのを手伝いました。なぜここからサーバーを購入した...

李佳琦とヴィヤは本当にそんなに重要なのでしょうか?

今年はライブストリーミング販売が大人気です。大企業であれ、個人事業主であれ、商品を売るのに役立つアン...

分散 KVM とは何かを理解するための分散 KVM システム アーキテクチャ図

[[314761]] ご存知のとおり、コマンド センター、コントロール センターなどのシナリオでは、...

Xinnetにバックエンド管理の脆弱性が見つかり、数十万件のドメイン名管理パスワードが漏洩した

Admin5によると、12月22日、国内のセキュリティ情報プラットフォームWuyun - 脆弱性報告...

5 月の世界 Web サーバー市場シェア: Microsoft のみが市場シェアを 33.41% に増加

IDC Review Network (idcps.com) は 5 月 13 日に次のように報告し...

ビジネスイノベーションの加速 マルチクラウド管理はエンタープライズ開発に必須ですか?

現在、クラウド コンピューティング業界とインターネット アプリケーションがかつてないほど発展しており...