Kafka ベンダー向けのよくある面接の質問: 高パフォーマンスと高スループットを確保しながら高可用性を確保する

Kafka ベンダー向けのよくある面接の質問: 高パフォーマンスと高スループットを確保しながら高可用性を確保する

Kafka のメッセージ送信保証メカニズムは非常に直感的です。プロデューサーがブローカーにメッセージを送信すると、メッセージがコミットされると、レプリケーション メカニズムが存在するため、メッセージは失われません。ただし、プロデューサーがブローカーにデータを送信した後にネットワークの問題が発生し、通信が中断された場合、プロデューサーはメッセージがコミットされたかどうかを判断できません。 Kafka はネットワーク障害時に何が起こったのかを判断できませんが、プロデューサーはメッセージがブローカーに正しく送信されたことを確認するために複数回再試行できるため、Kafka は現在少なくとも 1 回は実装しています。

1. 冪等性

1. シナリオ

いわゆるべき等性とは、インターフェースへの複数の呼び出しの結果が単一の呼び出しの結果と同じであることを意味します。プロデューサーは再試行時にメッセージを繰り返し書き込む可能性がありますが、これは Kafka のべき等性機能を使用することで回避できます。

冪等性は条件付きです:

プロデューサーが単一セッション内で損失や重複が発生しないことを保証することのみが可能です。プロデューサーが予期せずクラッシュして再起動した場合、これは保証されません (べき等性の場合、以前の状態情報を取得することは不可能であるため、セッション間で損失や重複をゼロにすることは不可能です)。

冪等性は複数のトピック パーティションにまたがることはできず、単一のパーティション内でのみ冪等性を保証できます。複数のトピック パーティションが関係する場合、その間の状態は同期されません。

Producer でべき等性を使用する例は非常に単純です。 Producer の通常の使用方法とあまり変わりません。以下に示すように、Producer 構成の enable.idempotence を true に設定するだけです。

  1. プロパティ props = new Properties();
  2. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true" );
  3. props.put( "acks" , "all" ); // enable.idempotenceがtrueの場合、ここでのデフォルトはすべて  
  4. props.put( "bootstrap.servers" , "localhost:9092" );
  5. props.put( "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" );
  6. props.put( "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" );
  7.  
  8. KafkaProducer プロデューサー = new KafkaProducer(props);
  9.  
  10. プロデューサー.send(新しいプロデューサーレコード(トピック、 「テスト」 );

2. 事務

1. シナリオ

べき等性は複数のパーティション間では機能しませんが、トランザクションによってこの欠点を補うことができます。トランザクションにより、複数のパーティションへの書き込み操作のアトミック性が保証されます。操作の原子性とは、複数の操作がすべて成功するか、すべて失敗するかのいずれかであり、部分的な成功や部分的な失敗の可能性がないことを意味します。

トランザクションを実装するには、ネットワーク障害によって、クライアント プログラムを通じて設定される一意の transactionalId が提供される必要があります。

コードリポジトリを参照してください:

com.heima.kafka.chapter7.ProducerTransactionSend

  1. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG、トランザクションID);

2. 事前準備

トランザクションでは、プロデューサーが冪等性を有効にする必要があります。したがって、transactional.id パラメータを空でない値に設定してトランザクションを有効にする場合は、ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG も true に設定する必要があります (デフォルト値は true)。明示的に false に設定されている場合、例外がスローされます。

KafkaProducer は、次の 5 つのトランザクション関連のメソッドを提供します。

  1. //transactionalIdが設定されている場合は、トランザクションを初期化します
  2. パブリックvoid initTransactions()
  3. //トランザクションを開始
  4. パブリックボイド beginTransaction()
  5. // トランザクション内でコンシューマーに置換送信操作を提供する
  6. パブリックvoid sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> オフセット、String consumerGroupId)
  7. //トランザクションをコミットする
  8. パブリックボイドコミットトランザクション()
  9. //ロールバックと同様にトランザクションを終了します
  10. パブリックvoid abortTransaction()

3. 事例分析

コードリポジトリを参照してください:

  • com.heima.kafka.chapter7.ProducerTransactionSend

メッセージ送信者

  1. /**
  2. * Kafkaプロデューサートランザクションの使用
  3. */
  4. パブリッククラス ProducerTransactionSend {
  5. 公共 静的最終文字列トピック = "トピックトランザクション" ;
  6. 公共 静的最終文字列 brokerList = "localhost:9092" ;
  7. 公共 静的最終文字列 transactionId = "transactionId" ;
  8.      
  9. 公共 静的void main(String[] args) {
  10. プロパティ properties = new Properties();
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG、StringSerializer.class.getName());
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG、StringSerializer.class.getName());
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG、brokerList);
  14. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG、トランザクションID);
  15.          
  16. KafkaProducer<String, String> プロデューサー = 新しい KafkaProducer<> (プロパティ);
  17.          
  18. プロデューサー.initTransactions();
  19. プロデューサー.beginTransaction();
  20.          
  21. 試す {
  22. //ビジネスロジックを処理して ProducerRecord を作成する
  23. ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1" );
  24. プロデューサー.send(レコード1);
  25. ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2" );
  26. プロデューサー.send(レコード2);
  27. ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3" );
  28. プロデューサー.send(レコード3);
  29. //他のロジックを処理する
  30. プロデューサー.commitTransaction();
  31. } キャッチ (ProducerFencedException e) {
  32. プロデューサー.abortTransaction();
  33. }
  34. プロデューサー.close () ;
  35. }
  36. }

トランザクションロールバックケースのシミュレーション

  1. 試す {
  2. //ビジネスロジックを処理して ProducerRecord を作成する
  3. ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1" );
  4. プロデューサー.send(レコード1);
  5.      
  6. //トランザクションのロールバックケースをシミュレートする
  7. システム.out.println(1/0 ) ;
  8.      
  9. ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2" );
  10. プロデューサー.send(レコード2);
  11. ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3" );
  12. プロデューサー.send(レコード3);
  13. //他のロジックを処理する
  14. プロデューサー.commitTransaction();
  15. } キャッチ (ProducerFencedException e) {
  16. プロデューサー.abortTransaction();
  17. }

上記のケースでは、msg1 が正常に送信された後、異常なトランザクションが発生してロールバックされ、msg1 コンシューマーはメッセージを受信できません。

3. コントローラー

Kafka クラスターには 1 つ以上のブローカーがあり、そのうちの 1 つがコントローラー (Kafka コントローラー) として選出され、クラスター全体のすべてのパーティションとレプリカのステータスを管理する役割を担います。パーティションのリーダー レプリカに障害が発生した場合、コントローラーはパーティションの新しいリーダー レプリカを選択する責任を負います。パーティションの ISR セットで変更が検出されると、コントローラはすべてのブローカーにメタデータ情報を更新するように通知する役割を担います。 kafka-topics.sh スクリプトを使用してトピックのパーティション数を増やす場合でも、パーティションの再割り当てはコントローラーが担当します。

Kafka でのコントローラーの選択は Zookeeper に依存します。コントローラーに対して正常に実行されたブローカーは、Zookeeper に一時的な (EPHEMERAL) ノード /controller を作成します。この一時ノードの内容は次のとおりです。

1. ZooInspector 管理

管理には、Zookeeper グラフィカル クライアント ツール (ZooInspector) によって提供される jar を使用し、次のように起動します。

  1. jarが配置されているディレクトリを見つけます
  2. jarファイルjava -jar zookeeper-dev-ZooInspector.jarを実行します。
  3. Zookeeperへの接続

  1. { "バージョン" :1、 "ブローカーID" :0、 "タイムスタンプ" : "1529210278988" }

現在のバージョンでは、version は 1 に固定され、brokerid はコントローラーとして選出されたブローカーの ID 番号を示し、timestamp はコントローラーの選出が行われたタイムスタンプを示します。

クラスター内には常に 1 つのコントローラーのみが存在します。各ブローカーが起動すると、/controller ノードの brokerid 値を読み取ろうとします。読み取られた brokerid 値が -1 でない場合、別のブローカー ノードがコントローラーに対して正常に実行されたことを意味し、現在のブローカーは選出を放棄します。 Zookeeper に /controller ノードが存在しない場合、またはこのノードのデータが異常な場合は、/controller ノードを作成しようとします。現在のブローカーがノードを作成すると、他のブローカーも同時にこのノードを作成しようとする場合があります。ノードを正常に作成したブローカーのみがコントローラーになり、ノードの作成に失敗したブローカーは選出に失敗したことを意味します。各ブローカーは、現在のコントローラーの brokerid 値をメモリに保存します。これは、activeControllerId として識別できます。

Zookeeper のコントローラーに関連する /controller_epoch ノードもあります。このノードは永続的 (PERSISTENT) ノードであり、整数の controller_epoch 値を格納します。 controller_epoch は、コントローラーが変更された回数を記録するために使用されます。つまり、現在のコントローラーの世代を記録するために使用され、「コントローラー エポック」とも呼ばれます。

controller_epoch の初期値は 1 です。つまり、クラスター内の最初のコントローラーのエポックは 1 です。コントローラーが変更されると、新しいコントローラーが選択されなければ、このフィールドの値は 1 増加します。コントローラーと対話する各リクエストには、controller_epoch フィールドが含まれます。リクエストの controller_epoch 値がメモリ内の controller_epoch 値より小さい場合、リクエストは期限切れのコントローラに送信されたとみなされ、リクエストは無効とみなされます。要求された controller_epoch 値がメモリ内の controller_epoch 値より大きい場合、新しいコントローラが選択されたことを意味します。 Kafka は controller_epoch を使用してコントローラーの一意性を保証し、それによって関連する操作の一貫性を確保していることがわかります。

コントローラー ステータスを持つブローカーには、他の通常のブローカーよりも 1 つ多くの責任があります。詳細は以下の通りです。

  1. パーティション関連の変更を監視します。
  2. トピック関連の変更を監視します。
  3. ブローカー関連の変更を監視します。
  4. Zookeeper からトピック、パーティション、ブローカーに関連するすべての最新情報を読み取り、それに応じて管理します。

4. 信頼性の保証

  1. 信頼性保証: システムがさまざまな環境で一貫して動作することを保証する
  2. カフカの保証
  3. パーティション化されたメッセージの順序を保証する 同じプロデューサーを使用して同じパーティションにメッセージを書き込む場合、メッセージ B がメッセージ A の後に書き込まれると、Kafka はメッセージ B のオフセットがメッセージ A のオフセットよりも大きいことを保証し、コンシューマーはメッセージ B を読み取る前にメッセージ A を読み取ります。
  4. メッセージは、パーティション (ファイル システム キャッシュ) のすべての同期レプリカに書き込まれた場合にのみコミットされたと見なされます。
  5. プロデューサーは、パラメータacksを制御して、異なるタイプの確認を受け取るように選択できます。
  6. アクティブなレプリカがある限り、コミットされたメッセージは失われません。
  7. コンシューマーはコミットされたメッセージのみを読み取ることができます

1. 期限切れのコピー

パーティションに同期失敗状態のレプリカがあるかどうかを確認する方法を教えてください。 Kafka バージョン 0.9.x 以降では、単一のパラメータ replica.lag.time.max.ms によって制御されます (デフォルトのサイズは 10,000)。 ISR 内のフォロワー レプリカが、パラメーター replica.lag.time.max.ms で指定された値を超えてリーダー レプリカより遅れている場合、レプリカは無効とみなされ、このフォロワー レプリカを ISR から削除する必要があります。具体的な実装原理は非常にシンプルです。フォロワーコピーがリーダーコピーの LEO (Log End Offset、各パーティションの最後のメッセージの位置) より前のすべてのログを同期すると、フォロワーコピーがリーダーコピーに追いついたとみなされ、この時点でコピーの lastCaughtUpTimeMs フラグが更新されます。 Kafka の ReplicaManager が起動すると、レプリカの有効期限検出のためのスケジュールされたタスクが開始されます。このスケジュールされたタスクは、現在の時刻とレプリカの lastCaughtUpTimeMs の差が、パラメーター replica.lag.time.max.ms で指定された値より大きいかどうかを定期的にチェックします。フォロワー コピーがリーダー コピーからデータを取得する限り、lastCaughtUpTimeMs を更新すると誤解しないでください。リーダー コピーのメッセージ流入率がフォロワー コピーのプル率よりも大きい場合、フォロワー コピーはリーダー コピーからメッセージをプルし続け、リーダー コピーと同期できないとします。このフォロワー コピーが ISR に配置されている場合、リーダー コピーに障害が発生し、このフォロワー コピーが新しいリーダー コピーとして選択されると、重大なメッセージ損失が発生します。

2. コピー

Kafka の各トピック パーティションは n 回複製されます。ここで、n はトピックのレプリケーション係数です。これにより、クラスター サーバーに障害が発生した場合に Kafka がこれらのレプリカに自動的に切り替えられるため、障害発生時にもメッセージが引き続き利用可能になります。 Kafka のレプリケーションはパーティションの粒度に基づいており、パーティションの先行書き込みログは n 台のサーバーに複製されます。 n 個のレプリカのうち、1 つのレプリカがリーダーとして機能し、他のレプリカはフォロワーになります。名前が示すように、プロデューサーはリーダー パーティションにのみデータを書き込むことができ (読み取りはリーダー パーティションからのみ実行可能)、フォロワーはリーダーからログを順番にコピーすることのみが可能です。

レプリカがリーダーと同期しなくなる理由はいくつかあります。フォロワーが一定期間内にリーダーに追いつくことができない場合などです。最も一般的な理由の 1 つは、I/O ボトルネックにより、フォロワーがリーダーからプルするよりも遅い速度でレプリケーション メッセージを追加することです。スタックしたレプリカ: フォロワーは一定期間、リーダーからのリクエストのプルを停止します。フォロワー レプリカは、GC の一時停止、フォロワーの障害または停止により停止しています。

新しく開始されたレプリカ: ユーザーがトピックのレプリケーション係数を増やすと、新しいフォロワーは、リーダー ログに完全に追いつくまで、同期レプリカのリストに表示されません。

レプリカが遅れているかどうかを判断する方法:

  1. レプリカ.ラグ。最大メッセージ数= 4

サーバー側で設定する必要があるパラメータは、replica.lag.time.max.ms の 1 つだけです。このパラメータは、レプリカがパーティション リーダーに応答するまでの最大待機時間を説明します。スタックまたは失敗したレプリカをプローブします - レプリカが失敗し、プル リクエストの送信間隔が replica.lag.time.max.ms を超える場合。 Kafka はこのレプリカが無効であると見なし、同期されたレプリカのリストから削除します。遅いレプリカを検出するメカニズムが変更されました - レプリカがリーダーより replica.lag.time.max.ms 以上遅れ始めた場合。 Kafka はそれを遅すぎると判断し、同期レプリカのリストから削除します。リーダーへのレプリカ要求の間隔が replica.lag.time.max.ms より大きくない場合、リーダーはトラフィックの急増を引き起こし、メッセージを大きなバッチで書き込みます。 Kafka は同期中のレプリカのリストからレプリカを削除しません。

リーダーエポックリファレンス

データ損失のシナリオ

データの不整合のシナリオ

カフカ 0.11.0.0。バージョン ソリューション

上記の 2 つの問題の根本的な原因は、HW 値がレプリカ バックアップの成功を測定するために使用され、障害発生時にログを切り捨てる基準として使用されていることです。ただし、HW 値の更新は非同期的に遅延され、特に更新するには追加の FETCH 要求処理プロセスが必要になります。したがって、途中でクラッシュが発生すると、HW 値が期限切れになる可能性があります。これらの理由から、Kafka 0.11 では HW 値の代わりにリーダー エポックが導入されました。リーダー側は、リーダーのエポック情報を保存するための追加のメモリ領域を割り当てるため、上記 2 つのシナリオが発生した場合でも、これらの問題を十分に回避できます。

いわゆるリーダー エポックは、実際には値のペア (エポック、オフセット) です。エポックは、0 から始まるリーダーのバージョン番号を表します。リーダーが 1 回変更されると、エポックは +1 になり、オフセットは、このエポック バージョンのリーダーによって書き込まれた最初のメッセージの変位に対応します。次のような 2 つの値のペアがあるとします。

  • (0, 0)
  • (1、120)

これは、最初のリーダーがオフセット 0 からメッセージの書き込みを開始することを意味します。合計120件のメッセージ[0, 119]が書き込まれます。 2 番目のリーダーのバージョン番号は 1 で、オフセット 120 からメッセージの書き込みを開始します。

このようなキャッシュはリーダー ブローカーに保存され、定期的にチェックポイント ファイルに書き込まれます。

データ損失を回避する:

データの不整合を回避する

6. メッセージの重複に関するシナリオと解決策

1. プロデューサー側の重複

プロデューサーによって送信されたメッセージは正しいブレーク応答を受信できなかったため、プロデューサーは再試行します。

プロデューサーがメッセージを送信し、メッセージがディスクに書き込まれた後、送信者はネットワークなどのさまざまな理由により失敗またはネットワーク中断の応答を受信し、その後プロデューサーは回復可能な例外再試行メッセージを受信し、その結果、メッセージが重複します。

解決:

  • Kafkaのべき等性を開始する

Kafka のべき等性を有効にするために、コードを変更する必要はありません。デフォルトでは無効になっています。設定ファイルを変更する必要があります: enable.idempotence=true、require ack=all、retries>1。

  • ack=0、再試行なし。

メッセージが失われる可能性があります。これは、ログ収集など、データ損失よりもスループットが重要な状況に適しています。

消費者側の重複

根本的な原因

データが消費された後、オフセットがブローカーに時間内に送信されません。

解決

自動コミットをキャンセルする

消費が完了するたび、またはプログラムが終了するたびに手動で送信します。重複が保証されるわけではありません。

下流冪等性

一般的な解決策は、ダウンストリームをべき等にするか、消費される各メッセージのオフセットを記録することです。いくつかの厳密なシナリオでは、正確な更新を保証するために、注文 ID や下流ステータス更新などのオフセットまたは一意の ID を同じデータベース トランザクションに配置するか、消費オフセットを同時に下流データ テーブルに記録し、その後、下流データを更新するときに消費サイトを使用して楽観的ロックを実行して、古いサイトでのデータ更新を拒否する必要がある場合があります。

7. __​​コンシューマーオフセット

_consumer_offsets は、ユーザーに対して透過的な内部トピックです。データ ファイルとログに時々表示される以外、ユーザーは一般にこのトピックを認識していません。しかし、新しいバージョンの Kafka コンシューマーの変位情報が格納されていることはわかっています。

1. いつ作成されましたか?

一般に、トピック __consumer_offsets は、最初のコンシューマーがクラスター内のメッセージを消費するときに自動的に作成されます。パーティションの数は、offsets.topic.num.partitions パラメータで設定できます。デフォルト値は、以下に示すように 50 です。

2. パーティションの解析

コードリポジトリを参照してください:

  1. com.heima.kafka.chapter7.消費者オフセット分析

すべてのパーティションを取得します。

要約する

この章では、主に冪等性やトランザクション処理などの Kafka 関連の安定性操作について説明します。また、信頼性と一貫性の保証、メッセージの重複と解決策についても説明します。

<<:  Kube-vip を使用して高可用性の Kubernetes クラスターを構築する (フル バージョン)

>>:  アンチャンがKCSP資格を取得し、クラウドネイティブの強みが再び国際的に認められる

推薦する

ウェブサイトの最適化と検索エンジンの最適化の関係

ウェブサイトの最適化には、ユーザーの最適化、検索エンジンの最適化、ウェブサイトの運用と保守の最適化と...

化粧品タオバオストアのプロモーション戦略分析

どれくらいの人が Taobao ストアを運営し始めたのかはわかりません。私の友人や同僚の多くが Ta...

グローバルB2C電子商取引サイトの革新的モデルと関連事例の研究

The Inspirations of Business は、ビジネス イノベーションのトレンドを研...

Dynamics 365の中国上陸で発表された情報の徹底分析

[51CTO.comよりオリジナル記事] 中国市場で唯一の国際パブリッククラウドを展開する21Via...

山大文学のトップ経営陣交代の噂が現実に、CEOの侯小強が辞任

【侯小強は数ヶ月間休職している。彼は何度も「健康上の理由で辞めた」と強調し、陳天橋に不満はないと述べ...

製品設計の 2 つの側面: ユーザーの習慣を導くべきか、それともそれに従うべきか?

[編集者注] この記事はIT Migrant Workerから転載したものです。この問題は、ユーザー...

新しいサイトの迅速な追加に関する中核的な操作についての簡単な説明

新しいウェブサイトを検索エンジンに素早くインデックスさせるにはどうすればよいでしょうか。これは、多く...

セキュリティ機器ベンダーはクラウドコンピューティングブームに苦戦している

クラウド サービス市場が活発化するにつれ、従来のセキュリティ製品ベンダーはすべてクラウド戦略の構築に...

百度の重みを高めるために新しいサイトが注意すべきいくつかのポイントを簡単に分析する

現在、中国の検索エンジン業界は百度が独占しています。特にGoogleが中国市場から撤退した後、百度は...

X86 サーバー仮想化のリソース割り当てとパフォーマンス最適化の詳細な説明

仮想化は、通常、実際のベースではなく仮想ベースで実行されるコンピューティング要素を指す広い用語です。...

budgetvm-新しいVPS料金システム/OPENVZ/XEN/SSD/4コンピュータルーム

budgetvm が Web サイトを完全に刷新した後、価格体系も完全に変更されました。budget...

ハードウェアの観点から見たエッジコンピューティングとは何ですか?

IoT および IIoT デバイスによって生成される膨大な量のデータにより、エッジ コンピューティン...

短い動画が人気になるにつれ、従来の SEO マーケティング モデルはどうなるのでしょうか?

過去2年間で、ショートビデオは急速に発展し、ユーザー規模も急速に拡大したため、ショートビデオは非常に...

人気の大学入試イベントから学ぶオンラインマーケティングの新たな手法

毎年、大学入試では、昨年の早期提出、受験生のスキャンダルなど、いくつかのホットな出来事が起こりますが...

SEO における「必要のない」10 のこと

SEO について一定期間学習した後、多くのウェブマスターはよくある誤解を実践し始めます。 1. タイ...