この記事はWeChatの公開アカウント「小蔡良基」から転載したもので、著者は蔡歩才です。この記事を転載する場合は、Xiaocailiangji公式アカウントまでご連絡ください。 初期のカフカ 1. はじめに Kafka はもともと、ZooKeeper コーディネーションに基づくマルチパーティション、マルチレプリカの分散メッセージング システムとして、Scala 言語を使用して Linkedin によって開発されました。現在、Apache Foundation に寄贈されています。現在、Kafka は分散ストリーム処理プラットフォームとして位置付けられています。高いスループット、永続性、水平スケーラビリティ、ストリーム データ処理のサポートなどの機能により、広く使用されています。 2. 使用シナリオ メッセージング システム: Kafka と従来のメッセージング システム (メッセージング ミドルウェア) はどちらも、システム分離、冗長ストレージ、トラフィック ピークの削減、バッファリング、非同期通信、スケーラビリティ、回復可能性などの機能を備えています。同時に、Kafka は、ほとんどのメッセージング システムでは実現が難しいメッセージ順序の保証とバックトラッキング消費機能も提供します。 ストレージ システム: Kafka はメッセージをディスクに保存するため、他のメモリベースのストレージ システムと比較してデータ損失のリスクが効果的に軽減されます。 Kafka のメッセージ永続化機能とマルチコピーメカニズムのおかげで、Kafka を長期データストレージシステムとして使用できます。対応するデータ保持ポリシーを「永続的」に設定するか、トピックのログ圧縮機能を有効にするだけです。 ストリーム処理プラットフォーム: Kafka は、一般的なストリーム処理フレームワークごとに信頼性の高いデータ ソースを提供するだけでなく、ウィンドウ、接続、交換、集約などの操作を含む完全なストリーム処理ライブラリも提供します。 3. 基本概念 Kafka システム アーキテクチャには、複数の「プロデューサー」、「ブローカー」、「コンシューマー」、および ZooKeeper クラスターが含まれます。
Kafka システム全体は、おおよそ上記の部分で構成されています。さらに、特に重要な概念が2つあります。トピックとパーティションです。
Kafka はパーティションのマルチコピー (レプリカ) メカニズムを導入し、レプリカの数を増やすことで災害復旧機能を向上させることができます。 同じメッセージは同じパーティションの異なるレプリカに保存されます (レプリカは同時にまったく同じではありません)。レプリカには「1 つのマスターと複数のスレーブ」の関係があり、リーダー レプリカは読み取りおよび書き込み要求の処理を担当し、フォロワー レプリカはリーダー レプリカとのメッセージの同期のみを担当します。レプリカは異なるブローカーにあります。リーダー レプリカに障害が発生すると、外部サービスを提供するためにフォロワー レプリカから新しいリーダー レプリカが再選出されます。 「Kafka は、マルチコピー メカニズムを通じて自動フェイルオーバーを実現します。これにより、Kafka クラスター内のブローカーに障害が発生した場合でも、サービスが引き続き利用可能になります。」 Kafka の理解を続ける前に、いくつかのキーワードを理解する必要があります。
上記の関係から、次の式を導き出すことができます: AR=ISR+OSR
これを見た多くの友人は少し焦っていると思います。カフカはなぜ難しいのでしょうか?それでもうまく習得できるでしょうか? 心配しないでください。まずは理論的な知識を学ぶ必要があります。これは落胆の始まりではなく、成長の始まりです。以下のレシピは、最も簡単な文章を使用して、最も深い穴に導くことを目指しています。 カフカ制作チーム ご存知のとおり、Kafka は高度な意味では分散メッセージ キューですが、簡単に言えば単なるメッセージ キューです。簡単に言えば、メッセージ キューはデータをプッシュしたり取得したりするためのものです。そうです、高度な知識を得るには、多くの場合、単純な理解が必要です。 では、データはどこから来るのでしょうか?データは制作チームから提供されました!プログラミングの観点から見ると、プロダクション チームにはプロデューサーのグループがあり (もちろん 1 つだけの場合もあります)、プロデューサーは Kafka にメッセージを送信する役割を担うアプリケーションです。 クライアント開発 製造プロセスには、一般的に次の手順が含まれます。
「4 つのステップと 1 つのシャトルで生産上の問題を解決」 上記のコードでは、プロパティ ファイルに次の 4 つのパラメータを指定していることがわかります。
ProducerRecord の定義は次のとおりです。
上記の操作は、プロデューサー インスタンスを作成し、メッセージを構築することです。メッセージを送信するための主なモードは 3 つあります。
上記で使用した送信方法は、ファイア アンド フォーゲットです。メッセージが正しく到着したかどうかを気にせずに、Kafka にメッセージを送信するだけです。ほとんどの場合、この送信方法では問題はありませんが、場合によっては (再試行できない例外が発生する) メッセージが失われることがあります。 「この配信方法は最高のパフォーマンスを提供しますが、信頼性は最も低くなります。」
sendメソッドからはFutureオブジェクトが返される。
これは、send() メソッド自体が非同期であり、send() メソッドによって返される Future オブジェクトにより、呼び出し元が後で送信の結果を取得できることを示しています。同期効果を実現したい場合は、Future の get() メソッドを直接呼び出すことができます。
get() メソッドを使用して、メッセージが正常に送信されるか例外が発生するまで、Kafka の応答をブロックして待機します。 生産を非同期にすることはできますか? Kafka では、send() メソッドに別のオーバーロードがあります。
コールバックメソッドは非常にシンプルで明確です。 Kafka は応答するとコールバックし、正常に送信するか例外をスローします。 onCompletion() メソッドの 2 つのパラメーターは相互に排他的です。送信が成功した場合、RecordMetadata は空ではなく、Exception は空になります。送信に失敗した場合は、その逆になります。 製作上難しいところはありますか? KafkaProducer で一般的に発生する例外には 2 種類あります。
ネットワーク例外、リーダー利用不可例外、不明トピックまたはパーティション例外、 レプリカが十分でない例外、コーディネーターが不十分な例外
RecordTooLargeException など 再試行可能な例外の場合は、「retries」パラメータを設定できます。指定された再試行回数内に回復した場合、例外はスローされません。 「retries」パラメータのデフォルト値は 0 です。設定方法は次のとおりです。
上記の例では、再試行回数は 10 回です。10 回後に回復が失敗すると、例外がスローされます。 RecordTooLargeException などの再試行不可能な例外は、送信されるメッセージが大きすぎる場合、再試行は実行されず、例外が直接スローされることを示します。 シリアル化で役立つ プロデューサーは、ネットワーク経由で Kafka に送信する前に、シリアライザーを使用してオブジェクトをバイト配列に変換する必要があります。対応するコンシューマーも、デシリアライザーを使用して、Kafka で受信したバイト配列を対応するオブジェクトに変換する必要があります。 上記のコードで使用されているStringSerializerはSerializerインターフェースを実装しています。 configure() メソッドは現在のクラスを構成するために使用され、serizlize() メソッドはシリアル化操作を実行するために使用されます。 「プロデューサーが使用するシリアライザーとコンシューマーが使用するデシリアライザーは 1 対 1 で対応している必要があります」 もちろん、Kafka が提供するシリアライザーを使用するだけでなく、シリアライザーをカスタマイズすることもできます。 「学生.クラス」:
「MySerializer」: "使用":
プロパティに独自のシリアライザーを配置するだけです。意外と簡単! パーティショナーとは何ですか? send() メソッドを通じてブローカーにメッセージを送信するプロセスでは、「インターセプター」、「シリアライザー」、および「パーティショナー」を通過する必要がある場合があります。 このうち、「インターセプター」は必須ではありませんが、「シリアライザー」は必須です。シリアライザーを通過した後、送信先のパーティションを決定する必要があります。メッセージ ProducerRecord でパーティション フィールドが指定されている場合、パーティションは送信先のパーティション番号を表すため、「partitioner」は必要ありません。
上記は Kafka の Partitioner インターフェースです。パーティション番号を計算し、int 値を返すメソッドpartition()があることがわかります。 6 つのパラメータは次のものを表します。
メインのパーティション割り当てロジックは、partition() メソッドで定義されます。キーが空でない場合、デフォルトのパーティショナーはキーをハッシュし (MurmurHash2 アルゴリズムを使用)、取得したハッシュ値に基づいてパーティション番号を最終的に計算します。同じキーを持つメッセージは同じパーティションに書き込まれます。キーが空の場合、メッセージはトピック内の使用可能な各パーティションにラウンドロビン方式で送信されます。 「キーが null でない場合、計算されたパーティション番号はすべてのパーティションのいずれかになります。キーが空の場合、計算されたパーティション番号は使用可能なパーティションのいずれか 1 つになります。」 もちろん、パーティショナーもカスタマイズできます。操作は次のとおりです。 「MyPartitioner.クラス」: "使用": プロパティを設定します(ProducerConfig.PARTITIONER_CLASS_CONFIG、MyPartitioner.class.getName()); カスタムパーティショナーも使いやすく、Partitionerインターフェースを実装するだけです。 インターセプターが来る? Web 開発を行う学生であれば、おそらくインターセプターについてよくご存知でしょう。 Kafka にはインターセプターの機能もあり、これは「プロデューサー インターセプター」と「コンシューマー インターセプター」に分かれています。 プロデューサー インターセプターは、特定のルールに従って要件を満たさないメッセージをフィルタリングしたり、メッセージの内容を変更したりするなど、メッセージを送信する前にいくつかの準備作業を実行できます。また、コールバック ロジックを送信する前にいくつかのカスタマイズされた要件を作成するためにも使用できます。 その後、必要に応じてカスタマイズを行います。インターセプターをカスタマイズする場合は、ProducerInterceptor インターフェースを実装するだけで済みます。
onSend() メソッドは、メッセージに対して対応するカスタマイズされた操作を実行でき、メッセージが送信に失敗する前、またはメッセージが確認される (Acknowledgement) 前に onAcknowledgement() メソッドが呼び出され、ユーザーが設定したコールバックよりも優先されます。 カスタム インターセプターは次のとおりです: MyProducerInterceptor.class: onSend() メソッドでは、送信するメッセージを変更します。 onAcknowledgement() メソッドでは、成功したメッセージと失敗したメッセージの数をカウントします。次に、close() メソッドで、成功したメッセージと失敗したメッセージの数を出力します。 同じ使用法:
インターセプターがあると、自然にインターセプター チェーンが形成されます。複数のインターセプターをカスタマイズし、それらをプロパティ ファイルで宣言することができます。
「こうすることで、次のインターセプターは前のインターセプターの出力に依存することになります」 重要なパラメータ 上記のパラメータに加えて、他にも重要なパラメータがいくつかあります。 1. ああ このパラメータは、プロデューサーがメッセージを受け取る前にパーティション内のレプリカがいくつ必要かを指定するために使用されます。
メッセージは正常に書き込まれました。 ackには3種類の値(文字列)があります
設定:
2. 最大リクエストサイズ プロデューサー クライアントが送信できるメッセージの最大値を制限するために使用されます。デフォルト値は 1048576B、つまり 1MB です。 3. 再試行 プロデューサーの再試行回数を構成するために使用されます。デフォルト値は 0 です。これは、例外が発生しても再試行が実行されないことを意味します。 4. 再試行.バックオフ.ms 無効な頻繁な再試行を回避するために、2 回の再試行間の時間間隔を設定するために使用されます。デフォルト値は100です 5. 接続最大アイドル時間 このパラメータは、制限された接続を閉じるのにかかる時間を指定するために使用されます。デフォルト値は 540000 (ms)、つまり 9 分です。 6.バッファメモリ キャッシュされたメッセージのバッファサイズを設定するために使用します 7.バッチサイズ 再利用可能なメモリ領域のサイズを設定するために使用します Kafka コンシューマー グループ 生産があれば消費もある、そうでしょう?生産者に対応するのは消費者です。アプリケーションは KafkaConsumer を通じてトピックをサブスクライブし、サブスクライブしたトピックからメッセージをプルできます。 個人とグループ? 各コンシューマーには対応するコンシューマー グループがあります。コンシューマーは、Kafka 内のトピックをサブスクライブし、サブスクライブしたトピックからメッセージをプルする責任を負います。メッセージがトピックに公開されると、そのメッセージは、そのメッセージをサブスクライブしている各コンシューマー グループ内の 1 つのコンシューマーにのみ配信されます。 コンシューマー グループ内にコンシューマーが 1 つしかない場合は、次のようになります。 コンシューマー グループに 2 人のコンシューマーがいる場合、状況は次のようになります。 上記の分布から、消費者の増加に伴い、全体的な消費能力は水平方向に拡張可能であることがわかります。消費者の数を増やす(または減らす)ことで、全体的な購買力を高める(または下げる)ことができます。当時、パーティションの数が固定されていたため、消費者を盲目的に追加しても消費能力が継続的に増加することはありませんでした。消費者が多すぎると、消費者の数がパーティションの数よりも多くなり、一部の消費者はどのパーティションにも割り当てられなくなります。 上記の割り当てロジックは、デフォルトのパーティション割り当て戦略に基づいて分析されます。コンシューマー クライアントでpartition.assignment.strategyを構成することにより、コンシューマーとサブスクリプション トピック間のパーティション割り当て戦略を設定できます。 配送モード Kafka には 2 つのメッセージ配信モードがあります。 ポイントツーポイント キューベースでは、メッセージプロデューサーがキューにメッセージを送信し、メッセージコンシューマーがキューからメッセージを受信します。 パブリッシュ/サブスクライブ トピックに基づいて、トピックはメッセージ配信の仲介者と見なすことができます。メッセージ パブリッシャーはトピックにメッセージをパブリッシュし、メッセージ サブスクライバーはトピックからメッセージをサブスクライブします。トピックは、メッセージのサブスクライバーとパブリッシャーを互いに独立させ、接触なしでのメッセージの配信を保証します。パブリッシュ/サブスクライブ モデルは、1 対多のメッセージ ブロードキャストに使用されます。 クライアント開発 消費プロセスには通常、次の手順が必要です。
コンシューマー パラメータを構成するときに、いくつかのよく知られたパラメータを確認したことがわかります。
client.id: 書き込みエラーを防ぐために、ConsumerConfig.CLIENT_ID_CONFIG を使用して、KafkaConsumer に対応するクライアント ID を設定できます。デフォルト値は「」です トピックの購読 コンシューマーがメッセージを消費するには、対応するトピックをサブスクライブすることが重要です。上記の例では、consumer.subscribe(Arrays.asList(topic)); を通じてトピックをサブスクライブします。コンシューマーは 1 つ以上のトピックをサブスクライブできることがわかります。 subscribe() メソッドのオーバーロードを見てみましょう。
サブスクリプション プロセス中に次のことが発生した場合:
すると、最終的には topic1 ではなく topic2 のみがサブスクライブされ、topic1 と topic2 の組み合わせはサブスクライブされなくなります。 subscribe() メソッドは正規表現もサポートするようにオーバーロードされています。
この構成では、誰かが新しいトピックを作成し、そのトピック名が正規表現と一致する場合、このコンシューマーは新しく追加されたトピックからのメッセージを消費できます。 トピックと正規表現をパラメータとして渡すことに加えて、subscribe() メソッドには、対応する再バランス リスナーを設定するために使用される ConsumerRebalanceListener パラメータの受け渡しをサポートする 2 つのメソッドもあります。 subscribe() メソッドを介してトピックをサブスクライブすることに加えて、コンシューマーは、assign() メソッドを介して特定のトピックの特定のパーティションを直接サブスクライブすることもできます。
TopicPartition オブジェクトは次のように定義されます。 コンストラクターは、「サブスクライブされたトピック」と「パーティション番号」を渡す必要があります。これは次のように使用できます。
このようにして、kafka-demo のパーティション 0 をサブスクライブできます。 トピック内にパーティションがいくつあるか事前にわからない場合はどうすればよいでしょうか? KafkaConsumer のpartitionsFor() メソッドを使用して、指定されたトピックのメタデータ情報を照会できます。 partitionsFor() メソッドは次のように定義されます。
PartitionInfo オブジェクトは次のように定義されます。
サブスクリプションは悪意を持ってバンドルされていません。購読できる場合は、購読を解除できます。トピックをサブスクライブするには、KafkaConsumer の unsubscribe() メソッドを使用できます。このメソッドは、subscribe(Collection) によって実装されたサブスクリプション、subscribe(Pattem) によって実装されたサブスクリプション、およびassign(Collection) によって実装されたサブスクリプションをキャンセルできます。
subscribe(Collection) または assignment(Collection) のコレクション パラメータが空のコレクションに設定されている場合、unsubscribe() メソッドと同等になります。次の例の 3 行のコードは同じ効果があります。
消費パターン 一般的に、メッセージの消費モードには「プッシュ モード」と「プル モード」の 2 つがあります。 Kafkaの消費は「プルモデル」に基づいています プッシュモード: サーバーは積極的にメッセージを消費者にプッシュします プルモード: コンシューマーがサーバーにプルリクエストを積極的に開始する Kafka のメッセージの消費は継続的なポーリング プロセスです。コンシューマーが行う必要があるのは、poll() メソッドを繰り返し呼び出すことだけです。一部のパーティションに消費可能なメッセージがない場合、このパーティションに対応するメッセージをプルした結果は空になります。サブスクライブされたすべてのパーティションに消費可能なメッセージがない場合、poll() メソッドは空のメッセージ コレクションを返します。
タイムアウト パラメータ timeout を poll() メソッドに渡すことで、poll() メソッドのブロック時間を制御できます。ブロッキングは、コンシューマーのバッファーに利用可能なデータがない場合に発生します。 poll() メソッドによって取得されるメッセージは ConsumerRecord オブジェクトであり、次のように定義されます。 メッセージを消費するときに、ConsumerRecord 内の関心のあるフィールドに対して特定のビジネス ロジック処理を直接実行できます。 消費者インターセプター プロデューサー インターセプターの使用についてはすでに上で説明しました。もちろん、消費者にも応答性の高いインターセプターという概念があります。コンシューマー インターセプターは主に、メッセージを消費するとき、または消費変位を送信するときに、いくつかのカスタマイズされた操作を実行します。 プロデューサーは ProducerInterceptor インターフェースを実装してインターセプターを定義し、コンシューマーは ConsumerInterceptor インターフェースを実装してインターセプターを定義します。 ConsumerInterceptor は次のように定義されます。
インターセプターをカスタマイズした後も同じ方法を使用しました。
重要なパラメータ 上記のパラメータに加えて、他にも重要なパラメータがいくつかあります。 1. 最小バイト数を取得する このパラメータは、プル リクエスト (poll() メソッドの呼び出し) でコンシューマーが Kafka からプルできるデータの最小量を構成するために使用されます。デフォルト値は 1B です。返されるデータの量がこのパラメータで設定された値より少ない場合、データの量がこのパラメータで設定されたサイズを満たすまで待機する必要があります。 2. フェッチ最大バイト数 このパラメータは、コンシューマーが 1 回のプル リクエストで Kafka からプルできるデータの最大量を構成するために使用されます。デフォルト値は52428800 B (50M)です。 3. フェッチ最大待ち時間ms このパラメータは、Kafka の待機時間を指定するために使用されます。デフォルト値は 500 ミリ秒です。 4. 最大パーティションフェッチバイト このパラメータは、各パーティションからコンシューマーに返されるデータの最大量を構成するために使用されます。デフォルト値は1048576 B (1MB)です。 5. 最大投票レコード数 このパラメータは、コンシューマーが 1 つのプル リクエストでプルするメッセージの最大数を構成するために使用されます。デフォルト値は 500 です。 6. リクエストタイムアウト.ms このパラメータは、コンシューマーがリクエスト応答を待機する最大時間を構成するために使用されます。デフォルト値は 30000 ミリ秒です。 Kafka トピック管理 これまでのプロデューサー側とコンシューマー側で、「トピック」の概念をすでに見てきました。 「トピック」は Kafka の中核です。 メッセージの分類として、トピックはさらに 1 つ以上のパーティションに分割できます。パーティションは、メッセージの二次分類と見なすこともできます。パーティション分割は、Kafka にスケーラビリティと水平拡張機能を提供するだけでなく、マルチコピー メカニズムを通じて Kafka にデータ冗長性を提供し、データの信頼性を向上させます。 1. トピックを作成する ブローカー側には auto.create.topics.enable と呼ばれる構成パラメータがあります (デフォルト値は true)。このパラメータが「true」の場合、プロデューサーがまだ作成されていないトピックにメッセージを送信すると、num.partitions(デフォルト値は 1)とレプリケーション係数 default.replication.factor(デフォルト値は 1)を持つトピックが自動的に作成されます。 「スクリプトを使用して作成」:
「TopicCommand を使用してトピックを作成する」: Maven 依存関係をエクスポートします。
上記の例では、パーティションが 4 つあり、レプリケーション係数が 2 のトピックが作成されます。 2. テーマを表示する
現在利用可能なすべてのテーマは、list コマンドを使用して表示できます。
describe コマンドを使用して、単一のトピックの情報を表示できます。 --topic コマンドを使用してトピックを指定しない場合は、すべてのトピックの詳細情報が表示されます。 --topic は複数のトピックの指定もサポートします:
3. テーマを変更する トピックを作成した後、alter コマンドを使用して、パーティション数の変更、構成の変更など、トピックに変更を加えることができます。
パーティションを変更するときは、次の点に注意する必要があります。 トピック kafka-demo のパーティション数が 1 の場合、メッセージのキー値が何であっても、メッセージはこのパーティションに送信されます。パーティション数が 3 に増えると、メッセージのキーに基づいてパーティション番号が計算されます。元々パーティション 0 に送信されたメッセージは、パーティション 1 またはパーティション 2 に送信される場合があります。そのため、最初にパーティションの数を設定することをお勧めします。 現在、Kafka はパーティション数の増加のみをサポートしており、削減はサポートしていません。トピック kafka-demo のパーティション数を 1 に変更する場合、InvalidPartitionException が報告されます。 4. トピックを削除する トピックを今後使用しないことが確実な場合は、トピックを削除するのが最善の方法です。これにより、ディスク、ファイル ハンドルなどのリソースを解放できます。この時点で、delete コマンドを使用してトピックを削除できます。
トピックを削除するには、ブローカーの delete.topic.enable パラメータを true に設定する必要があることに注意してください。このパラメータのデフォルト値は true です。 false に設定されている場合、トピックを削除する操作は無視されます。 削除するトピックが Kafka の内部トピックである場合、削除時にエラーが報告されます。例: __consumer_offsets および __transaction_state 共通パラメータ
上記はおおよそカフカの紹介です。今日の知識はここに紹介されています。コンテンツはそれほど詳細ではありませんが、単語の数は小さくありません。あなたがそれを読み終えることができれば、Xiaocaiはあなたに親指を与えます! |
<<: Prometheus監視プラットフォームを導入する際には、6つの要素を考慮する必要がありますが、どれも無視することはできません。
>>: CTO が混乱を避けるためのクラウド イノベーションのヒントを提供
Baidu のホームページで上位にランクインしたい場合、ウェブサイトのランキングを向上させるためにい...
2012年に百度がアルゴリズムを何度も更新した後、今年はすべてのウェブマスターが少し成熟したと思いま...
著者はプロの SEO 運用および保守担当者です。多くの SEO 担当者と同様に、私は日中は仕事に行き...
Zhihu のユーザーデータを収集するというアイデアは、かなり以前からありました。このアイデアを実現...
オンラインマーケティングには、数え切れないほど多くの活用方法があります。人が使う媒体であれば、マーケ...
初期の開発中に Web サイトを引き継いだ Web マスターであれば、おめでとうございます。これは良...
あっという間に、2012年の旧正月が過ぎました。新年の喜びが完全に薄れる前に、多くの草の根ウェブマス...
動画市場をみると、長編動画プラットフォームはiQiyi、Youku、Tencent Videoが占め...
商品説明ページを作成する前に、まず1つのことを明確に考える必要があります。それは、商品説明ページで最...
A5ウェブマスターネットワーク(www.admin5.com)は4月1日、かつて数多くの若者に愛され...
つい最近設立された crowncloud は、openvz と KVM をベースにした VPS を提...
9月9日、2020年度アリババDAMOアカデミーヤングオレンジ賞の受賞者が発表され、梁文華氏と他の1...
このウェブサイトは企業情報の公開のみを目的としています企業のウェブサイト構築は、まずマーケティングデ...
Hiformance の最新の電子メールは、ハイエンドで安価な VPS、OpenVZ 仮想ロサンゼル...
[[418139]] [51CTO.com クイック翻訳]データレイクを補完する動的データを処理する...