Kafka を使い始めましょう。知れば知るほど、知らないことが増えます!

Kafka を使い始めましょう。知れば知るほど、知らないことが増えます!

[[340900]]

この記事はWeChatの公開アカウント「小蔡良基」から転載したもので、著者は蔡歩才です。この記事を転載する場合は、Xiaocailiangji公式アカウントまでご連絡ください。

初期のカフカ

1. はじめに

Kafka はもともと、ZooKeeper コーディネーションに基づくマルチパーティション、マルチレプリカの分散メッセージング システムとして、Scala 言語を使用して Linkedin によって開発されました。現在、Apache Foundation に寄贈されています。現在、Kafka は分散ストリーム処理プラットフォームとして位置付けられています。高いスループット、永続性、水平スケーラビリティ、ストリーム データ処理のサポートなどの機能により、広く使用されています。

2. 使用シナリオ

メッセージング システム: Kafka と従来のメッセージング システム (メッセージング ミドルウェア) はどちらも、システム分離、冗長ストレージ、トラフィック ピークの削減、バッファリング、非同期通信、スケーラビリティ、回復可能性などの機能を備えています。同時に、Kafka は、ほとんどのメッセージング システムでは実現が難しいメッセージ順序の保証とバックトラッキング消費機能も提供します。

ストレージ システム: Kafka はメッセージをディスクに保存するため、他のメモリベースのストレージ システムと比較してデータ損失のリスクが効果的に軽減されます。 Kafka のメッセージ永続化機能とマルチコピーメカニズムのおかげで、Kafka を長期データストレージシステムとして使用できます。対応するデータ保持ポリシーを「永続的」に設定するか、トピックのログ圧縮機能を有効にするだけです。

ストリーム処理プラットフォーム: Kafka は、一般的なストリーム処理フレームワークごとに信頼性の高いデータ ソースを提供するだけでなく、ウィンドウ、接続、交換、集約などの操作を含む完全なストリーム処理ライブラリも提供します。

3. 基本概念

Kafka システム アーキテクチャには、複数の「プロデューサー」、「ブローカー」、「コンシューマー」、および ZooKeeper クラスターが含まれます。

  • ZooKeeper: Kafka は、クラスターのメタデータ管理、コントローラーの選択、その他の操作を担当します。
  • プロデューサー: メッセージを送信するプロデューサー。メッセージを作成し、それを Kafka に配信する役割を担います。
  • 消費者: メッセージを受信する当事者。 Kafka に接続した後、メッセージを受信し、対応するビジネス ロジック処理を実行します。
  • ブローカー: サービス プロキシ ノード。 Kafka の場合、Broker は単純に独立した Kafka サービス ノードまたは Kafka サービス インスタンスと見なすことができます。ほとんどの場合、サーバーにデプロイされている Kafka インスタンスが 1 つだけであれば、Broker は Kafka サーバーと見なすこともできます。 1 つ以上のブローカーが Kafka クラスターを形成します。

Kafka システム全体は、おおよそ上記の部分で構成されています。さらに、特に重要な概念が2つあります。トピックとパーティションです。

  • トピック: Kafka 内のメッセージはトピック別に分類されます。プロデューサーは特定のトピックにメッセージを送信する責任があり (Kafka クラスターに送信される各メッセージはトピックを指定する必要があります)、コンシューマーはトピックをサブスクライブして消費する責任があります。
  • パーティション: トピックは論理的な概念です。複数のパーティションに分割することもできます。パーティションは単一のトピックにのみ属します。パーティションはトピック パーティションと呼ばれることもあります。同じトピックの下にある異なるパーティションには、異なるメッセージが含まれます。パーティションは、ストレージ レベルで追加可能な「ログ ファイル」と見なすことができます。メッセージがパーティション ログ ファイルに追加されると、特定のオフセットが割り当てられます。オフセットは、パーティション内のメッセージの一意の識別子です。 Kafka はこれを使用して、パーティション内のメッセージの順序を確保します。ただし、オフセットはパーティションにまたがりません。つまり、Kafka はトピックではなくパーティションの順序を保証します。

Kafka はパーティションのマルチコピー (レプリカ) メカニズムを導入し、レプリカの数を増やすことで災害復旧機能を向上させることができます。

同じメッセージは同じパーティションの異なるレプリカに保存されます (レプリカは同時にまったく同じではありません)。レプリカには「1 つのマスターと複数のスレーブ」の関係があり、リーダー レプリカは読み取りおよび書き込み要求の処理を担当し、フォロワー レプリカはリーダー レプリカとのメッセージの同期のみを担当します。レプリカは異なるブローカーにあります。リーダー レプリカに障害が発生すると、外部サービスを提供するためにフォロワー レプリカから新しいリーダー レプリカが再選出されます。

「Kafka は、マルチコピー メカニズムを通じて自動フェイルオーバーを実現します。これにより、Kafka クラスター内のブローカーに障害が発生した場合でも、サービスが引き続き利用可能になります。」

Kafka の理解を続ける前に、いくつかのキーワードを理解する必要があります。

  • AR(割り当てられたレプリカ):パーティション内のすべてのレプリカは、総称してARと呼ばれます。
  • ISR (同期レプリカ): リーダー レプリカ (リーダー レプリカを含む) と一定の同期を維持するすべてのレプリカが ISR を構成します。 ISR セットは AR セットのサブセットです。メッセージは最初にリーダー コピーに送信され、その後フォロワー コピーは同期のためにリーダー コピーからメッセージをプルできます。同期期間中、フォロワー コピーはリーダー コピーよりある程度遅れます。
  • OSR (Out-of-Sync Replicas): リーダーレプリカから大幅に遅れているレプリカ (リーダーレプリカを除く) は OSR を形成します。

上記の関係から、次の式を導き出すことができます: AR=ISR+OSR

  • HW (ハイ ウォーターマーク): 一般的にハイ ウォーターマークと呼ばれ、特定のメッセージ オフセットを識別するために使用されます。コンシューマーはこのオフセットより前のメッセージのみをプルできます。
  • LEO (LogStartOffset): 次に書き込まれるメッセージのオフセット

これを見た多くの友人は少し焦っていると思います。カフカはなぜ難しいのでしょうか?それでもうまく習得できるでしょうか?

心配しないでください。まずは理論的な知識を学ぶ必要があります。これは落胆の始まりではなく、成長の始まりです。以下のレシピは、最も簡単な文章を使用して、最も深い穴に導くことを目指しています。

カフカ制作チーム

ご存知のとおり、Kafka は高度な意味では分散メッセージ キューですが、簡単に言えば単なるメッセージ キューです。簡単に言えば、メッセージ キューはデータをプッシュしたり取得したりするためのものです。そうです、高度な知識を得るには、多くの場合、単純な理解が必要です。

では、データはどこから来るのでしょうか?データは制作チームから提供されました!プログラミングの観点から見ると、プロダクション チームにはプロデューサーのグループがあり (もちろん 1 つだけの場合もあります)、プロデューサーは Kafka にメッセージを送信する役割を担うアプリケーションです。

クライアント開発

製造プロセスには、一般的に次の手順が含まれます。

  • プロデューサークライアントパラメータを設定し、レスポンスのプロデューサーインスタンスを作成する
  • 送信するメッセージの構築
  • メッセージを送信
  • プロデューサーインスタンスをシャットダウンする

「4 つのステップと 1 つのシャトルで生産上の問題を解決」

上記のコードでは、プロパティ ファイルに次の 4 つのパラメータを指定していることがわかります。

  • bootstrap.servers: このパラメーターは、プロデューサー クライアントが Kafka クラスターに接続するために必要なブローカー アドレスを指定するために使用されます。形式は (host1:port1, host2:port2) です。カンマで区切って 1 つ以上のアドレスを設定できます。デフォルト値は「」です
  • key.serializer と value.serializer: それぞれキーと値のシリアル化操作のシリアライザーを指定するために使用されます。これら2つのパラメータにはデフォルト値がなく、シリアライザの完全修飾名を入力する必要があります。
  • client.id: KafkaProducer に対応するクライアント ID を設定するために使用されます。デフォルト値は「 」です。クライアントが設定しない場合、KafkaProducer は、文字列「producer-」と数字を連結した「producer-1」、「producer-2」の形式の内容を持つ空でない文字列を自動的に生成します。

ProducerRecord の定義は次のとおりです。

  • トピックとパーティション: それぞれ、メッセージが送信されるトピックとパーティション番号を表します。
  • ヘッダー: メッセージのヘッダー。必要ない場合は設定しないでおくことができます。
  • key: メッセージを指定するために使用されるキー。これはメッセージへの追加メッセージであるだけでなく、メッセージを特定のパーティションに送信できるようにパーティション番号を計算するためにも使用できます。
  • 値: メッセージ本文。通常は空ではありません。空の場合は、特定のメッセージ(「墓石メッセージ」)を示します。
  • タイムスタンプ: メッセージのタイムスタンプ。 CreateTime と LogAppendTime の 2 つのタイプがあります。前者はメッセージが作成された時刻を示し、後者はメッセージがログ ファイルに追加された時刻を示します。

上記の操作は、プロデューサー インスタンスを作成し、メッセージを構築することです。メッセージを送信するための主なモードは 3 つあります。

  • ファイア・アンド・フォーゲット
  • 同期
  • 非同期

上記で使用した送信方法は、ファイア アンド フォーゲットです。メッセージが正しく到着したかどうかを気にせずに、Kafka にメッセージを送信するだけです。ほとんどの場合、この送信方法では問題はありませんが、場合によっては (再試行できない例外が発生する) メッセージが失われることがあります。 「この配信方法は最高のパフォーマンスを提供しますが、信頼性は最も低くなります。」

  1. パブリックFuture<RecordMetadata> を送信します (ProducerRecord<K,V> レコード) {}

sendメソッドからはFutureオブジェクトが返される。

  1. 将来の res = producer.send(record);

これは、send() メソッド自体が非同期であり、send() メソッドによって返される Future オブジェクトにより、呼び出し元が後で送信の結果を取得できることを示しています。同期効果を実現したい場合は、Future の get() メソッドを直接呼び出すことができます。

  1. 試す {
  2. プロデューサー.send(レコード).get();
  3. } キャッチ (例外 e) {
  4. e.printStackTrace();
  5. }

get() メソッドを使用して、メッセージが正常に送信されるか例外が発生するまで、Kafka の応答をブロックして待機します。

生産を非同期にすることはできますか?

Kafka では、send() メソッドに別のオーバーロードがあります。

  1. パブリックFuture<RecordMetadata> send(ProducerRecord<K,V> レコード、コールバック コールバック) {}
  1. プロデューサー.send(レコード、新しいコールバック() {
  2. @オーバーライド
  3. パブリックvoid onCompletion(RecordMetadata recordMetadata, 例外e) {
  4. if ( Objects.isNull (e)) {
  5. システム。出力.println( "トピック: " + recordMetadata.topic());
  6. }それ以外{
  7. システム。出力.println(e.getMessage());
  8. }
  9. }
  10. });

コールバックメソッドは非常にシンプルで明確です。 Kafka は応答するとコールバックし、正常に送信するか例外をスローします。

onCompletion() メソッドの 2 つのパラメーターは相互に排他的です。送信が成功した場合、RecordMetadata は空ではなく、Exception は空になります。送信に失敗した場合は、その逆になります。

製作上難しいところはありますか?

KafkaProducer で一般的に発生する例外には 2 種類あります。

  • 再試行可能な例外

ネットワーク例外、リーダー利用不可例外、不明トピックまたはパーティション例外、

レプリカが十分でない例外、コーディネーターが不十分な例外

  • 再試行不可能な例外

RecordTooLargeException など

再試行可能な例外の場合は、「retries」パラメータを設定できます。指定された再試行回数内に回復した場合、例外はスローされません。 「retries」パラメータのデフォルト値は 0 です。設定方法は次のとおりです。

  1. properties.put(ProducerConfig.RETRIES_CONFIG, 10);

上記の例では、再試行回数は 10 回です。10 回後に回復が失敗すると、例外がスローされます。

RecordTooLargeException などの再試行不可能な例外は、送信されるメッセージが大きすぎる場合、再試行は実行されず、例外が直接スローされることを示します。

シリアル化で役立つ

プロデューサーは、ネットワーク経由で Kafka に送信する前に、シリアライザーを使用してオブジェクトをバイト配列に変換する必要があります。対応するコンシューマーも、デシリアライザーを使用して、Kafka で受信したバイト配列を対応するオブジェクトに変換する必要があります。

上記のコードで使用されているStringSerializerはSerializerインターフェースを実装しています。

configure() メソッドは現在のクラスを構成するために使用され、serizlize() メソッドはシリアル化操作を実行するために使用されます。

「プロデューサーが使用するシリアライザーとコンシューマーが使用するデシリアライザーは 1 対 1 で対応している必要があります」

もちろん、Kafka が提供するシリアライザーを使用するだけでなく、シリアライザーをカスタマイズすることもできます。

「学生.クラス」:

  1. @データ
  2. パブリッククラスStudent {
  3.  
  4. プライベート文字列;
  5.  
  6. プライベート文字列コメント;
  7. }

「MySerializer」:

"使用":

  1. プロパティを設定します(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG、MySerializer.class.getName());

プロパティに独自のシリアライザーを配置するだけです。意外と簡単!

パーティショナーとは何ですか?

send() メソッドを通じてブローカーにメッセージを送信するプロセスでは、「インターセプター」、「シリアライザー」、および「パーティショナー」を通過する必要がある場合があります。

このうち、「インターセプター」は必須ではありませんが、「シリアライザー」は必須です。シリアライザーを通過した後、送信先のパーティションを決定する必要があります。メッセージ ProducerRecord でパーティション フィールドが指定されている場合、パーティションは送信先のパーティション番号を表すため、「partitioner」は必要ありません。

  1. パッケージ org.apache.kafka.clients.producer;
  2.  
  3. パブリックインターフェースPartitionerはConfigurableとCloseableを拡張します{
  4. intパーティション(文字列トピック、オブジェクトキー、byte[] keyBytes、オブジェクト値、byte[] valueBytes、クラスタークラスター);
  5. voidを閉じる();
  6. }

上記は Kafka の Partitioner インターフェースです。パーティション番号を計算し、int 値を返すメソッドpartition()があることがわかります。 6 つのパラメータは次のものを表します。

  1. トピック: 件名
  2. キー: キー
  3. keyBytes: シリアル化されたキー
  4. 値: 値
  5. valueBytes: シリアル化された値
  6. クラスター: クラスターのメタデータ情報

メインのパーティション割り当てロジックは、partition() メソッドで定義されます。キーが空でない場合、デフォルトのパーティショナーはキーをハッシュし (MurmurHash2 アルゴリズムを使用)、取得したハッシュ値に基づいてパーティション番号を最終的に計算します。同じキーを持つメッセージは同じパーティションに書き込まれます。キーが空の場合、メッセージはトピック内の使用可能な各パーティションにラウンドロビン方式で送信されます。

「キーが null でない場合、計算されたパーティション番号はすべてのパーティションのいずれかになります。キーが空の場合、計算されたパーティション番号は使用可能なパーティションのいずれか 1 つになります。」

もちろん、パーティショナーもカスタマイズできます。操作は次のとおりです。

「MyPartitioner.クラス」:

"使用":

プロパティを設定します(ProducerConfig.PARTITIONER_CLASS_CONFIG、MyPartitioner.class.getName());

カスタムパーティショナーも使いやすく、Partitionerインターフェースを実装するだけです。

インターセプターが来る?

Web 開発を行う学生であれば、おそらくインターセプターについてよくご存知でしょう。 Kafka にはインターセプターの機能もあり、これは「プロデューサー インターセプター」と「コンシューマー インターセプター」に分かれています。

プロデューサー インターセプターは、特定のルールに従って要件を満たさないメッセージをフィルタリングしたり、メッセージの内容を変更したりするなど、メッセージを送信する前にいくつかの準備作業を実行できます。また、コールバック ロジックを送信する前にいくつかのカスタマイズされた要件を作成するためにも使用できます。

その後、必要に応じてカスタマイズを行います。インターセプターをカスタマイズする場合は、ProducerInterceptor インターフェースを実装するだけで済みます。

  1. パッケージ org.apache.kafka.clients.producer;
  2.  
  3. パブリックインターフェース ProducerInterceptor <K, V> は Configurable を拡張します {
  4. ProducerRecord<K,V> をプロデューサー レコードに送信します。
  5.  
  6. void onAcknowledgement(RecordMetadata recordMetadata、例外 e);
  7.  
  8. voidを閉じる();
  9. }

onSend() メソッドは、メッセージに対して対応するカスタマイズされた操作を実行でき、メッセージが送信に失敗する前、またはメッセージが確認される (Acknowledgement) 前に onAcknowledgement() メソッドが呼び出され、ユーザーが設定したコールバックよりも優先されます。

カスタム インターセプターは次のとおりです: MyProducerInterceptor.class:

onSend() メソッドでは、送信するメッセージを変更します。 onAcknowledgement() メソッドでは、成功したメッセージと失敗したメッセージの数をカウントします。次に、close() メソッドで、成功したメッセージと失敗したメッセージの数を出力します。

同じ使用法:

  1. プロパティを設定します(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG、MyProducerInterceptor.class.getName());

インターセプターがあると、自然にインターセプター チェーンが形成されます。複数のインターセプターをカスタマイズし、それらをプロパティ ファイルで宣言することができます。

  1. プロパティを設定します(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG、MyProducerInterceptor1.class.getName() + "," + MyProducerInterceptor2.class.getName());

「こうすることで、次のインターセプターは前のインターセプターの出力に依存することになります」

重要なパラメータ

上記のパラメータに加えて、他にも重要なパラメータがいくつかあります。

1. ああ

このパラメータは、プロデューサーがメッセージを受け取る前にパーティション内のレプリカがいくつ必要かを指定するために使用されます。

  1. properties.put(ProducerConfig.ACKSCONFIG、"0"); //文字列型であることに注意してください

メッセージは正常に書き込まれました。 ackには3種類の値(文字列)があります

  1. acks = 1: デフォルト値は 1 です。プロデューサーがメッセージを送信した後、パーティションのリーダー レプリカがメッセージを正常に書き込むと、サーバーから正常な応答を受信します。メッセージがリーダー レプリカに書き込まれ、プロデューサーに成功応答が返された場合でも、他のフォロワー レプリカによってプルされる前にリーダー レプリカがクラッシュすると、メッセージは失われます。
  2. acks = 0: プロデューサーは、メッセージを送信した後、サーバーからの応答を待つ必要はありません。メッセージを送信して Kafka に書き込むプロセスで何らかの例外が発生し、Kafka がメッセージを受信できない場合、プロデューサーはそれを知ることができず、メッセージは失われます。他の構成が同じであれば、acks を 0 に設定すると、最大のスループットを実現できます。
  3. acks = -1 または acks = all: メッセージを送信した後、プロデューサーは、サーバーから正常な応答を受信する前に、ISR 内のすべてのレプリカがメッセージを正常に書き込むのを待機する必要があります。他の構成環境が同じ場合、acks を 1 または (all) に設定すると、最も強力な信頼性を実現できます。

設定:

  1. properties.put(ProducerConfig.ACKSCONFIG、"0"); //文字列型であることに注意してください

2. 最大リクエストサイズ

プロデューサー クライアントが送信できるメッセージの最大値を制限するために使用されます。デフォルト値は 1048576B、つまり 1MB です。

3. 再試行

プロデューサーの再試行回数を構成するために使用されます。デフォルト値は 0 です。これは、例外が発生しても再試行が実行されないことを意味します。

4. 再試行.バックオフ.ms

無効な頻繁な再試行を回避するために、2 回の再試行間の時間間隔を設定するために使用されます。デフォルト値は100です

5. 接続最大アイドル時間

このパラメータは、制限された接続を閉じるのにかかる時間を指定するために使用されます。デフォルト値は 540000 (ms)、つまり 9 分です。

6.バッファメモリ

キャッシュされたメッセージのバッファサイズを設定するために使用します

7.バッチサイズ

再利用可能なメモリ領域のサイズを設定するために使用します

Kafka コンシューマー グループ

生産があれば消費もある、そうでしょう?生産者に対応するのは消費者です。アプリケーションは KafkaConsumer を通じてトピックをサブスクライブし、サブスクライブしたトピックからメッセージをプルできます。

個人とグループ?

各コンシューマーには対応するコンシューマー グループがあります。コンシューマーは、Kafka 内のトピックをサブスクライブし、サブスクライブしたトピックからメッセージをプルする責任を負います。メッセージがトピックに公開されると、そのメッセージは、そのメッセージをサブスクライブしている各コンシューマー グループ内の 1 つのコンシューマーにのみ配信されます。

コンシューマー グループ内にコンシューマーが 1 つしかない場合は、次のようになります。

コンシューマー グループに 2 人のコンシューマーがいる場合、状況は次のようになります。

上記の分布から、消費者の増加に伴い、全体的な消費能力は水平方向に拡張可能であることがわかります。消費者の数を増やす(または減らす)ことで、全体的な購買力を高める(または下げる)ことができます。当時、パーティションの数が固定されていたため、消費者を盲目的に追加しても消費能力が継続的に増加することはありませんでした。消費者が多すぎると、消費者の数がパーティションの数よりも多くなり、一部の消費者はどのパーティションにも割り当てられなくなります。

上記の割り当てロジックは、デフォルトのパーティション割り当て戦略に基づいて分析されます。コンシューマー クライアントでpartition.assignment.strategyを構成することにより、コンシューマーとサブスクリプション トピック間のパーティション割り当て戦略を設定できます。

配送モード

Kafka には 2 つのメッセージ配信モードがあります。

ポイントツーポイント

キューベースでは、メッセージプロデューサーがキューにメッセージを送信し、メッセージコンシューマーがキューからメッセージを受信します。

パブリッシュ/サブスクライブ

トピックに基づいて、トピックはメッセージ配信の仲介者と見なすことができます。メッセージ パブリッシャーはトピックにメッセージをパブリッシュし、メッセージ サブスクライバーはトピックからメッセージをサブスクライブします。トピックは、メッセージのサブスクライバーとパブリッシャーを互いに独立させ、接触なしでのメッセージの配信を保証します。パブリッシュ/サブスクライブ モデルは、1 対多のメッセージ ブロードキャストに使用されます。

クライアント開発

消費プロセスには通常、次の手順が必要です。

  • コンシューマークライアントパラメータを設定し、対応するコンシューマーインスタンスを作成する
  • トピックを購読する
  • メッセージをプルして消費する
  • 消費変位を提出する
  • コンシューマーインスタンスを閉じる

コンシューマー パラメータを構成するときに、いくつかのよく知られたパラメータを確認したことがわかります。

  • bootstrap.servers: 書き込みエラーを防ぐために、ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG を使用して、Kafka クラスターに接続するために必要なブローカー アドレスのリストを指定できます。カンマで区切って 1 つ以上のアドレスを設定できます。デフォルト値は「」です
  • group.id: 書き込みエラーを防ぐために、ConsumerConfig.GROUP_ID_CONFIG を使用して、コンシューマーが属するコンシューマー グループの名前を表すことができます。デフォルト値は「 」です。空に設定すると、例外がスローされます。
  • key.deserializer/value.deserializer: 書き込みエラーを防ぐために、ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG と ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG を使用して、コンシューマーがレスポンスに対して実行する必要があるデシリアライズ操作がプロダクション側と一致している必要があることを示すことができます。

client.id: 書き込みエラーを防ぐために、ConsumerConfig.CLIENT_ID_CONFIG を使用して、KafkaConsumer に対応するクライアント ID を設定できます。デフォルト値は「」です

トピックの購読

コンシューマーがメッセージを消費するには、対応するトピックをサブスクライブすることが重要です。上記の例では、consumer.subscribe(Arrays.asList(topic)); を通じてトピックをサブスクライブします。コンシューマーは 1 つ以上のトピックをサブスクライブできることがわかります。 subscribe() メソッドのオーバーロードを見てみましょう。

  1. public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { /* コンパイルされたコード */ }
  2.  
  3. public void subscribe(Collection<String> topics) { /* コンパイルされたコード */ }
  4.  
  5. public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { /* コンパイルされたコード */ }
  6.  
  7. public void subscribe(Pattern pattern) { /* コンパイルされたコード */ }

サブスクリプション プロセス中に次のことが発生した場合:

  1. consumer.subscribe(Arrays.asList(topic1));
  2. consumer.subscribe(Arrays.asList(topic2));

すると、最終的には topic1 ではなく topic2 のみがサブスクライブされ、topic1 と topic2 の組み合わせはサブスクライブされなくなります。

subscribe() メソッドは正規表現もサポートするようにオーバーロードされています。

  1. consumer.subscribe(Pattern.compile("topic.*"));

この構成では、誰かが新しいトピックを作成し、そのトピック名が正規表現と一致する場合、このコンシューマーは新しく追加されたトピックからのメッセージを消費できます。

トピックと正規表現をパラメータとして渡すことに加えて、subscribe() メソッドには、対応する再バランス リスナーを設定するために使用される ConsumerRebalanceListener パラメータの受け渡しをサポートする 2 つのメソッドもあります。

subscribe() メソッドを介してトピックをサブスクライブすることに加えて、コンシューマーは、assign() メソッドを介して特定のトピックの特定のパーティションを直接サブスクライブすることもできます。

  1. パブリックvoid 割り当て (コレクション <TopicPartition> パーティション)

TopicPartition オブジェクトは次のように定義されます。

コンストラクターは、「サブスクライブされたトピック」と「パーティション番号」を渡す必要があります。これは次のように使用できます。

  1. コンシューマーにArrays.asList(新しいTopicPartition("kafka-demo", 0))を割り当てます。

このようにして、kafka-demo のパーティション 0 をサブスクライブできます。

トピック内にパーティションがいくつあるか事前にわからない場合はどうすればよいでしょうか? KafkaConsumer のpartitionsFor() メソッドを使用して、指定されたトピックのメタデータ情報を照会できます。 partitionsFor() メソッドは次のように定義されます。

  1. パブリックリスト <PartitionInfo> パーティション For(String topic);

PartitionInfo オブジェクトは次のように定義されます。

  1. パブリッククラスPartitioninfo {
  2. プライベート最終文字列トピック; //トピック名
  3. プライベート最終intパーティション; //パーティション番号
  4. プライベート最終ノードリーダー。 //パーティションのリーダーコピーの場所
  5. プライベート最終Node[]レプリカ; // 分割された AR セット
  6. プライベート最終Node[] inSyncReplicas; // パーティション化された ISR コレクション
  7. プライベート最終Node[] offlineReplicas; // パーティション化された OSR コレクション
  8. }

サブスクリプションは悪意を持ってバンドルされていません。購読できる場合は、購読を解除できます。トピックをサブスクライブするには、KafkaConsumer の unsubscribe() メソッドを使用できます。このメソッドは、subscribe(Collection) によって実装されたサブスクリプション、subscribe(Pattem) によって実装されたサブスクリプション、およびassign(Collection) によって実装されたサブスクリプションをキャンセルできます。

  1. 消費者.購読解除();

subscribe(Collection) または assignment(Collection) のコレクション パラメータが空のコレクションに設定されている場合、unsubscribe() メソッドと同等になります。次の例の 3 行のコードは同じ効果があります。

  1. 消費者.購読解除();
  2. consumer.subscribe(新しいArrayList<String>());
  3. コンシューマーに新しいArrayList<TopicPartition>()を割り当てます。

消費パターン

一般的に、メッセージの消費モードには「プッシュ モード」と「プル モード」の 2 つがあります。 Kafkaの消費は「プルモデル」に基づいています

プッシュモード: サーバーは積極的にメッセージを消費者にプッシュします

プルモード: コンシューマーがサーバーにプルリクエストを積極的に開始する

Kafka のメッセージの消費は継続的なポーリング プロセスです。コンシューマーが行う必要があるのは、poll() メソッドを繰り返し呼び出すことだけです。一部のパーティションに消費可能なメッセージがない場合、このパーティションに対応するメッセージをプルした結果は空になります。サブスクライブされたすべてのパーティションに消費可能なメッセージがない場合、poll() メソッドは空のメッセージ コレクションを返します。

  1. public ConsumerRecords<K, V> ポーリング(最終期間タイムアウト)

タイムアウト パラメータ timeout を poll() メソッドに渡すことで、poll() メソッドのブロック時間を制御できます。ブロッキングは、コンシューマーのバッファーに利用可能なデータがない場合に発生します。

poll() メソッドによって取得されるメッセージは ConsumerRecord オブジェクトであり、次のように定義されます。

メッセージを消費するときに、ConsumerRecord 内の関心のあるフィールドに対して特定のビジネス ロジック処理を直接実行できます。

消費者インターセプター

プロデューサー インターセプターの使用についてはすでに上で説明しました。もちろん、消費者にも応答性の高いインターセプターという概念があります。コンシューマー インターセプターは主に、メッセージを消費するとき、または消費変位を送信するときに、いくつかのカスタマイズされた操作を実行します。

プロデューサーは ProducerInterceptor インターフェースを実装してインターセプターを定義し、コンシューマーは ConsumerInterceptor インターフェースを実装してインターセプターを定義します。 ConsumerInterceptor は次のように定義されます。

  1. パッケージ org.apache.kafka.clients.consumer;
  2.  
  3. パブリックインターフェース ConsumerInterceptor <K, V> は Configurable、AutoCloseable を拡張します {
  4. ConsumerRecords<K,V> を ConsumerRecords<K,V> で消費します。
  5.  
  6. マップをコミットせずに、マップをコミットします。
  7.  
  8. voidを閉じる();
  9. }
  • onConsume(): KafkaConsumer は、poll() メソッドが返される前にインターセプターの onConsume() メソッドを呼び出して、返されたメッセージの内容を変更したり、特定のルールに従ってメッセージをフィルタリングしたりするなど、メッセージに対して対応するカスタマイズされた操作を実行します (これにより、poll() メソッドによって返されるメッセージの数が減る可能性があります)。 onConsume() メソッドで例外がスローされた場合、例外はキャッチされてログに記録されますが、例外は渡されません。
  • onCommit(): KafkaConsumer は、コンシューマーの置き換えを送信した後、インターセプターの onCommit() メソッドを呼び出します。この方法を使用すると、送信された変位情報を記録および追跡できます。たとえば、コンシューマーが commitSync パラメータなしメソッドを使用する場合、送信されたコンシューマーの変位の具体的な詳細はわかりませんが、インターセプターの onCommit() メソッドはこれを実行できます。

インターセプターをカスタマイズした後も同じ方法を使用しました。

  1. プロパティを設定します(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG、MyConsumerInterceptor.class.getName());

重要なパラメータ

上記のパラメータに加えて、他にも重要なパラメータがいくつかあります。

1. 最小バイト数を取得する

このパラメータは、プル リクエスト (poll() メソッドの呼び出し) でコンシューマーが Kafka からプルできるデータの最小量を構成するために使用されます。デフォルト値は 1B です。返されるデータの量がこのパラメータで設定された値より少ない場合、データの量がこのパラメータで設定されたサイズを満たすまで待機する必要があります。

2. フェッチ最大バイト数

このパラメータは、コンシューマーが 1 回のプル リクエストで Kafka からプルできるデータの最大量を構成するために使用されます。デフォルト値は52428800 B (50M)です。

3. フェッチ最大待ち時間ms

このパラメータは、Kafka の待機時間を指定するために使用されます。デフォルト値は 500 ミリ秒です。

4. 最大パーティションフェッチバイト

このパラメータは、各パーティションからコンシューマーに返されるデータの最大量を構成するために使用されます。デフォルト値は1048576 B (1MB)です。

5. 最大投票レコード数

このパラメータは、コンシューマーが 1 つのプル リクエストでプルするメッセージの最大数を構成するために使用されます。デフォルト値は 500 です。

6. リクエストタイムアウト.ms

このパラメータは、コンシューマーがリクエスト応答を待機する最大時間を構成するために使用されます。デフォルト値は 30000 ミリ秒です。

Kafka トピック管理

これまでのプロデューサー側とコンシューマー側で、「トピック」の概念をすでに見てきました。 「トピック」は Kafka の中核です。

メッセージの分類として、トピックはさらに 1 つ以上のパーティションに分割できます。パーティションは、メッセージの二次分類と見なすこともできます。パーティション分割は、Kafka にスケーラビリティと水平拡張機能を提供するだけでなく、マルチコピー メカニズムを通じて Kafka にデータ冗長性を提供し、データの信頼性を向上させます。

1. トピックを作成する

ブローカー側には auto.create.topics.enable と呼ばれる構成パラメータがあります (デフォルト値は true)。このパラメータが「true」の場合、プロデューサーがまだ作成されていないトピックにメッセージを送信すると、num.partitions(デフォルト値は 1)とレプリケーション係数 default.replication.factor(デフォルト値は 1)を持つトピックが自動的に作成されます。

「スクリプトを使用して作成」:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic kafka-demo --partitions 4 --replication-factor 2  

「TopicCommand を使用してトピックを作成する」:

Maven 依存関係をエクスポートします。

  1. <依存関係>
  2. <グループ ID>org.apache.kafka</グループ ID>
  3. <アーティファクトID>kafka_2.11</アーティファクトID>
  4. <バージョン>2.0.0</バージョン>
  5. </依存関係>
  1. 公共 静的void createTopic(String topicName) {
  2. 文字列[]オプション = 新しい文字列[]{
  3. "--zookeeper" "localhost:2181/kafka"
  4. " - 作成する"
  5. "--レプリケーション係数" , "2" ,
  6. "--パーティション" , "4" ,
  7. "--トピック" 、トピック名
  8. };
  9. kafka.admin.TopicCommand.main(オプション);
  10. }

上記の例では、パーティションが 4 つあり、レプリケーション係数が 2 のトピックが作成されます。

2. テーマを表示する

  • -リスト:

現在利用可能なすべてのテーマは、list コマンドを使用して表示できます。

  1. bin/kafka-topics.sh --zookeeper ローカルホスト:2181/kafka -list  
  • 説明する

describe コマンドを使用して、単一のトピックの情報を表示できます。 --topic コマンドを使用してトピックを指定しない場合は、すべてのトピックの詳細情報が表示されます。 --topic は複数のトピックの指定もサポートします:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic kafka-demo1、kafka-demo2  

3. テーマを変更する

トピックを作成した後、alter コマンドを使用して、パーティション数の変更、構成の変更など、トピックに変更を加えることができます。

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic kafka- demo --partitions 3  

パーティションを変更するときは、次の点に注意する必要があります。

トピック kafka-demo のパーティション数が 1 の場合、メッセージのキー値が何であっても、メッセージはこのパーティションに送信されます。パーティション数が 3 に増えると、メッセージのキーに基づいてパーティション番号が計算されます。元々パーティション 0 に送信されたメッセージは、パーティション 1 またはパーティション 2 に送信される場合があります。そのため、最初にパーティションの数を設定することをお勧めします。

現在、Kafka はパーティション数の増加のみをサポートしており、削減はサポートしていません。トピック kafka-demo のパーティション数を 1 に変更する場合、InvalidPartitionException が報告されます。

4. トピックを削除する

トピックを今後使用しないことが確実な場合は、トピックを削除するのが最善の方法です。これにより、ディスク、ファイル ハンドルなどのリソースを解放できます。この時点で、delete コマンドを使用してトピックを削除できます。

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic kafka-demo  

トピックを削除するには、ブローカーの delete.topic.enable パラメータを true に設定する必要があることに注意してください。このパラメータのデフォルト値は true です。 false に設定されている場合、トピックを削除する操作は無視されます。

削除するトピックが Kafka の内部トピックである場合、削除時にエラーが報告されます。例: __consumer_offsets および __transaction_state

共通パラメータ

パラメータ名解釈
変えるパーティションの数やトピックの構成など、トピックを変更するために使用されます。
config<キーと値のペア> トピックレベルのパラメータを設定するために使用するトピックを作成または変更します
作成するトピックを作成する
消去トピックを削除する
delete-config <設定名> テーマレベルで上書きされた設定を削除する
説明するトピックの詳細を表示する
ラック認識を無効にするトピックを作成すると、ラック情報が考慮されません
ヘルプヘルプ情報を印刷します
if-Exists トピックを変更または削除するときに使用されます。操作は、トピックが存在する場合にのみ実行されます。
if-exists トピックを作成するときに使用されます。アクションは、トピックが存在しない場合にのみ実行されます。
リスト利用可能なすべてのテーマをリストします
パーティション<パーティションの数> トピックを作成したり、パーティションを追加したりするときに、パーティションの数を指定します
Replica-assignment <Allocation Scheme> パーティションレプリカ割り当てスキームを手動で指定します
Replication-Factor <コピー数> トピックを作成するときに複製係数を指定します
トピック<トピック名> トピック名を指定します
オーバーライドのトピック descritionを使用してトピック情報を表示する場合、オーバーライドされた構成を含むトピックのみが表示されます。
接続のZookeeperアドレス情報を指定します

上記はおおよそカフカの紹介です。今日の知識はここに紹介されています。コンテンツはそれほど詳細ではありませんが、単語の数は小さくありません。あなたがそれを読み終えることができれば、Xiaocaiはあなたに親指を与えます!

<<:  Prometheus監視プラットフォームを導入する際には、6つの要素を考慮する必要がありますが、どれも無視することはできません。

>>:  CTO が混乱を避けるためのクラウド イノベーションのヒントを提供

推薦する

ジェイドエンタープライズウェブサイト最適化の実践分析

Baidu のホームページで上位にランクインしたい場合、ウェブサイトのランキングを向上させるためにい...

外部リンクへの道は長い。2013年に独占リンクを作成

2012年に百度がアルゴリズムを何度も更新した後、今年はすべてのウェブマスターが少し成熟したと思いま...

地域観光小規模ポータルの設立計画プロセス

著者はプロの SEO 運用および保守担当者です。多くの SEO 担当者と同様に、私は日中は仕事に行き...

Pythonを使用して、初心者でも理解できる分散型Zhihuクローラーを作成します。

Zhihu のユーザーデータを収集するというアイデアは、かなり以前からありました。このアイデアを実現...

最も効果的なオンラインマーケティング手法の分析

オンラインマーケティングには、数え切れないほど多くの活用方法があります。人が使う媒体であれば、マーケ...

最初にウェブサイトを運営し、宣伝する方法

初期の開発中に Web サイトを引き継いだ Web マスターであれば、おめでとうございます。これは良...

ウェブサイト運営の失敗の原因を理解し、ネットワーク起業のリスクを回避する

あっという間に、2012年の旧正月が過ぎました。新年の喜びが完全に薄れる前に、多くの草の根ウェブマス...

Xigua Videoの中編動画の躍進

動画市場をみると、長編動画プラットフォームはiQiyi、Youku、Tencent Videoが占め...

商品説明ページを作成する際に注意すべき点

商品説明ページを作成する前に、まず1つのことを明確に考える必要があります。それは、商品説明ページで最...

日々の話題:Xiaomiのような存在になったVanclは、かつての栄光を取り戻すことができるのか?

A5ウェブマスターネットワーク(www.admin5.com)は4月1日、かつて数多くの若者に愛され...

CrownCloud - 1GBメモリ/月額7ドル

つい最近設立された crowncloud は、openvz と KVM をベースにした VPS を提...

アリババDAMOアカデミーヤングオレンジ賞「ハードコア10人」受賞者が発表、鍾南山が若手科学者に激励の言葉を送る

9月9日、2020年度アリババDAMOアカデミーヤングオレンジ賞の受賞者が発表され、梁文華氏と他の1...

ウェブサイト構築に関するよくある7つの誤解をご存知ですか?

このウェブサイトは企業情報の公開のみを目的としています企業のウェブサイト構築は、まずマーケティングデ...

Hiformance: $14.99/年、4 コア/8g メモリ/80g SSD/8T トラフィック/2IP

Hiformance の最新の電子メールは、ハイエンドで安価な VPS、OpenVZ 仮想ロサンゼル...

クラウドネイティブ データレイク アーキテクチャにおけるサーバーレス Kafka

[[418139]] [51CTO.com クイック翻訳]データレイクを補完する動的データを処理する...