この記事はWeChatの公開アカウント「Micro Technology」から転載したもので、著者はMicro Technologyです。この記事を転載する場合は、Micro Technology の公開アカウントにご連絡ください。 みなさんこんにちは、トムです〜 Kafka は多くの人によく知られているオープンソースのメッセージング エンジンですが、ミドルウェア チームのメッセージング システム メンテナーでない限り、そのソース コードを詳しく調べた人はほとんどいません。しかし、どの職業にも独自の専門知識があります。市場には数多くのオープンソース フレームワークがあり、各フレームワークは頻繁にアップグレードされています。各フレームワークのソースコードを深く理解するためにエネルギーを費やすことは現実的ではありません。この記事では、ビジネスの観点から誰もが知っておく必要のある知識をいくつか紹介します。 この記事の内容: まず、なぜ Kafka を使用するのでしょうか?
Kafka の用語をすべて 1 か所にまとめました
ここでの ZooKeeper の役割は何でしょうか?これは、クラスター内でどのブローカーが実行されているか、どのトピックが作成されているか、各トピックにはいくつのパーティションがあるか、これらのパーティションのリーダー コピーがどのマシン上にあるかなど、Kafka クラスターのすべてのメタデータ情報を調整、管理、保存する役割を担う分散調整フレームワークです。 メッセージ送信形式純粋なバイナリバイトシーケンス。もちろん、メッセージは構造化されたままですが、使用する前にバイナリ バイト シーケンスに変換する必要があります。 メッセージ伝送プロトコル
メッセージ圧縮プロデューサー プログラムで compression.type パラメータを構成すると、指定されたタイプの圧縮アルゴリズムが有効になります。 props.put("compression.type", "gzip") は、このプロデューサーが使用する圧縮アルゴリズムが GZIP であることを示します。このように、プロデューサーの起動後に生成される各メッセージ セットは GZIP で圧縮されるため、Kafka ブローカー側のネットワーク転送帯域幅とディスク使用量を効果的に節約できます。 ただし、ブローカーが Snappy などの別の圧縮アルゴリズムを指定した場合、ブローカーはプロデューサーからのメッセージを解凍し、独自のアルゴリズムに従って再圧縮します。 圧縮アルゴリズムの比較: スループットの点では、LZ4 > Snappy > zstd および GZIP です。圧縮率で言えば、zstd > LZ4 > GZIP > Snappy です。 Kafka では、デフォルトでは圧縮アルゴリズムが指定されません。 メッセージの解凍コンシューマーがメッセージを取得すると、ブローカーはそれをそのまま送信します。メッセージがコンシューマーに到達すると、コンシューマーはメッセージを解凍し、以前のメッセージに復元します。 パーティショニング戦略org.apache.kafka.clients.Partitioner インターフェースを実装するクラスを記述します。 2 つの内部メソッド、partition() と close() を実装します。次に、プロデューサー側のパラメータpartitioner.classを明示的に設定します。 一般的な戦略:
プロデューサーはTCP接続を管理する新しい KafkaProducer インスタンスを作成すると、プロデューサー アプリケーションはバックグラウンドで Sender という名前のスレッドを作成して開始します。送信スレッドが実行を開始すると、最初にブローカーとの接続が作成されます。この時点では、メッセージがどのトピックに送信されるかは不明であるため、プロデューサーは起動時にすべてのブローカーとの接続を開始します。 プロデューサーは、metadata.max.age.ms パラメータを通じてメタデータ情報を定期的に更新します。デフォルト値は 300000 (5 分) です。クラスター側で変更があったかどうかに関係なく、プロデューサーは 5 分ごとにメタデータを強制的に更新し、最新のデータであることを確認します。 プロデューサーがメッセージを送信します:プロデューサーは、コールバック通知、producer.send(msg, callback) を含む送信 API を使用します。 acks = all を設定します。すべてのレプリカがメッセージを正常に受信し、メッセージが「コミット済み」であると見なされることを示す Producer のパラメータ。最高レベル、acksの他の値について説明します。 min.insync.replicas > 1 は、メッセージが「コミット済み」と見なされる前に書き込まれる必要があるレプリカの数を示します。 retries は Producer のパラメータです。瞬間的なネットワークジッタが発生すると、メッセージの送信が失敗する可能性があります。この時点で、再試行回数が 0 より大きい値に設定されたプロデューサーは、メッセージの損失を回避するために、メッセージの送信を自動的に再試行できます。 べき等プロデューサーパラメータ props.put("enable.idempotence", true) を設定すると、プロデューサーは自動的にべき等プロデューサーにアップグレードされます。その他のコード ロジックを変更する必要はありません。 Kafka は自動的にメッセージの重複を排除するのに役立ちます。 原理は非常に単純で、古典的なスペースと時間のトレード、つまりブローカー側でより多くのフィールドを節約するものです。プロデューサーが同じフィールド値を持つメッセージを送信すると、ブローカーはこれらのメッセージが重複していることを自動的に認識し、バックグラウンドでそれらを暗黙的に「破棄」できます。 メッセージの冪等性は、単一のパーティションと単一のセッションでのみ保証されます。べき等プロデューサーは、トピックのパーティションに重複したメッセージが表示されないようにすることができますが、複数のパーティションに対してべき等性を実現することはできません。たとえば、ポーリングを使用すると、次の送信でパーティションが変更され、解決されなくなります。 トランザクションプロデューサーメッセージが複数のパーティションにアトミックに書き込まれることを保証できます。すべてのメッセージが正常に書き込まれるか、またはすべてのメッセージが失敗します。パーティションとセッション全体でのべき等性を保証できます。
実際、書き込みが失敗した場合でも、Kafka は基礎となるログにメッセージを書き込むため、コンシューマーは引き続きこれらのメッセージを見ることができます。コンシューマー側で isolation.level 設定を処理するかどうか。このパラメータには 2 つの値があります。
Kafka Broker はどのようにデータを保存しますか?Kafka はメッセージ ログを使用してデータを保存します。ログは、メッセージの追加のみ (追加専用) できるディスク上の物理ファイルです。追加書き込みのみが可能なので、低速のランダム I/O 操作は回避され、パフォーマンスの優れたシーケンシャル I/O 書き込み操作に置き換えられます。これは、Kafka の高スループット特性を実現するための重要な手段でもあります。 ただし、ログにメッセージを書き込み続けると、最終的にはディスク領域がすべて使用されるため、Kafka は定期的にメッセージを削除してディスクを再利用する必要があります。削除するにはどうすればいいですか? 簡単に言えば、ログ セグメント メカニズムを介して行われます。 Kafka の最下層では、ログがさらに複数のログ セグメントに分割され、現在の最新のログ セグメントにメッセージが追加されます。ログ セグメントがいっぱいになると、Kafka は自動的に新しいログ セグメントを分割し、古いログ セグメントをシールします。 Kafka には、古いログ セグメントを削除できるかどうかを定期的にチェックし、ディスク領域を再利用するためのスケジュールされたタスクがバックグラウンドで実行されます。 Kafka のバックアップメカニズム同じデータを複数のマシンにコピーします。レプリカの数は設定可能です。 Kafka のフォローレプリカは外部サービスを提供しません。 レプリカの動作メカニズムもシンプルです。プロデューサーは常にリーダー レプリカにメッセージを書き込みます。コンシューマーは常にリーダーレプリカからメッセージを読み取ります。フォロワー コピーに関しては、1 つのことだけを実行します。つまり、リーダー コピーにプル リクエストを非同期的に送信し、リーダーに最新のメッセージを同期するように要求します。必ず、その中のデータがリーダーのデータと一致しなくなるか、リーダーより遅れる時間枠が存在する必要があります。 なぜ消費者団体を導入するのですか?主な目的は、消費者側のスループットを向上させることです。複数のコンシューマー インスタンスが同時に消費し、コンシューマー エンド全体のスループット (TPS) が向上します。 コンシューマー グループでは、パーティションを使用できるのは 1 人のコンシューマーのみですが、コンシューマーには複数のパーティションが割り当てられる場合があるため、変位を送信するときに、複数のパーティションの変位も送信できます。トピックに 2 つのパーティションがあり、コンシューマー グループに 3 つのコンシューマーがある場合、1 つのコンシューマーはどのパーティションにも割り当てられず、アイドル状態になります。 理想的には、コンシューマー インスタンスの数は、グループがトピックをサブスクライブするパーティションの合計数 (複数の可能性あり) と等しくなる必要があります。 コンシューマー側プル(バッチ)、ACKコンシューマーは最初にメッセージをプルして消費し、次に ack してオフセットを更新します。 1) コンシューマープログラムは複数のスレッドを開始します。各スレッドは専用の KafkaConsumer インスタンスを維持し、完全なメッセージのプルとメッセージ処理プロセスを担当します。 KafkaConsumer は 1 つのパーティションを担当し、パーティション内でのメッセージの消費順序を確保できます。 欠点: スレッドの数は、コンシューマーがトピックをサブスクライブするパーティションの合計数によって制限されます。 2) タスクは、メッセージ取得とメッセージ処理の 2 つの部分に分かれています。コンシューマー プログラムは、シングル スレッド メッセージまたはマルチ スレッド メッセージを使用してメッセージをプルし、ビジネス ロジックを実行するための専用のスレッド プールを作成します。利点: メッセージ取得とメッセージ処理のスレッド数を柔軟に調整できます。 欠点: パーティション内でのメッセージの消費順序は保証されません。さらに、複数のスレッド グループを導入すると、メッセージ消費リンク全体が長くなり、最終的には正しい変位を送信することが非常に困難になり、メッセージの重複消費や損失が発生する可能性があります。 消費者側のオフセット管理1) 古いバージョンの Consumer グループでは、ZooKeeper に変位を保存していましたが、ZooKeeper は頻繁な書き込み更新には適していないことがすぐに判明しました。 2) コンシューマー グループの新バージョンでは、Kafka コミュニティはコンシューマー グループの変位管理方法を再設計し、Kafka 自体が管理する「変位トピック」とも呼ばれるブローカー側の内部トピックに変位を保存する方法を採用しました。原理は非常に単純です。コンシューマーのオフセット データは、通常の Kafka メッセージとして __consumer_offsets に送信されます。メッセージ形式は Kafka 自体によって定義されており、ユーザーが変更することはできません。変位テーマのキーは主に 3 つの部分で構成されます。 Kafka Consumer が変位を送信するには、変位の自動送信と変位の手動送信の 2 つの方法があります。 Kafka は、トピックが無期限に拡張されるのを防ぐために、Compact 戦略を使用して、変位トピック内の期限切れのメッセージを削除します。圧縮するトピックを定期的に検査し、条件を満たす削除可能なデータがあるかどうかを確認するための専用のバックグラウンド スレッドが提供されます。 リバランスのトリガー条件1) グループメンバーの数が変わります。たとえば、新しい Consumer インスタンスがグループに参加したりグループから離脱したり、あるいは Consumer インスタンスがクラッシュしてグループから「追い出された」りします。 (99%の理由はこれによる) 2) 購読されているトピックの数が変更されます。コンシューマー グループは、正規表現を使用してトピックをサブスクライブできます。たとえば、consumer.subscribe(Pattern.compile("t.*c")) は、グループが文字 t で始まり文字 c で終わるすべてのトピックをサブスクライブすることを意味します。コンシューマー グループの操作中に、これらの条件を満たす新しいトピックを作成すると、グループのバランスが再調整されます。 3) サブスクライブされたトピックのパーティション数が変わります。 Kafka では現在、トピックのパーティション数を増やすことのみが可能です。パーティションの数が増えると、トピックをサブスクライブしているすべてのグループがトリガーされ、再バランス調整が開始されます。 メッセージの順序複数のパーティションを持つ Kafka の設計では、グローバルなメッセージの順序を保証できません。グローバル メッセージ順序を実装する必要がある場合は、単一のパーティションのみを使用できます。 方法2: キーでグループ化することで、同じキーを持つメッセージを同じパーティションに配置し、ローカル順序を確保します。 履歴データクリーニング戦略保持時間に基づくlog.retention.hours ログ サイズに基づいたクリーンアップ戦略。 log.retention.bytes によって制御されます 組み合わせ |
<<: JVM仮想マシンの全体構造とオブジェクトメモリ割り当ての分析
>>: 分散ストレージシステムの信頼性:システムの定量的評価
ウェブサイトのキーワードランキングは、主に強力なアンカーテキストリンクによって達成されます。検索エン...
1.CNドメイン名は5月29日に個人登録が開始されますAdmin5 Webmaster Networ...
みなさんこんにちは。私は徐子宇です。百度は先日、アルゴリズムのアップデートを発表しました。発表日より...
著者は現在、医療ウェブサイトを最適化しており、医療業界のウェブサイトについていくつかの意見を持ってい...
最近、Kingsoft CloudはPaaS層でRongCloudと深い戦略的協力関係を築き、両者は...
2月17日の海外メディアの報道によると、オンライン言語学習コミュニティのMemriseは本日、110...
この経済社会において、文学ウェブサイトは結局のところビジネスであり、文学ウェブサイトを運営する最終的...
Dawang Data の最新の春節プロモーション: (1) 米国 CN2+BGP 回線のハイエンド...
ヘンゴーストはどうですか? (恒創科技) 恒創は良いですか?香港と日本のデータセンターをテストした後...
私は最近、バイドゥの入札を説明するのが困難ですまた、これを深く理解していますここでは、Jiechen...
ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス今は、個人のウェブサイト...
01 デスクトップクラウドテクノロジー入門従来のオフィス端末の欠点デジタル経済の発展に伴い、データセ...
ブラウジングしていると、ultravps が特別価格の VPS 4 つ、クラウド サーバー 2 つ、...
インターネット マーケティングにおける長年の経験により、私は独自のインターネット マーケティング思考...
[[254590]] Kubernetes は今年、私のキャリアにとって非常に重要であり、新年も引き...