Kafka ベンダー向けのよくある面接の質問: 高パフォーマンスと高スループットを確保しながら高可用性を確保する

Kafka ベンダー向けのよくある面接の質問: 高パフォーマンスと高スループットを確保しながら高可用性を確保する

Kafka のメッセージ送信保証メカニズムは非常に直感的です。プロデューサーがブローカーにメッセージを送信すると、メッセージがコミットされると、レプリケーション メカニズムが存在するため、メッセージは失われません。ただし、プロデューサーがブローカーにデータを送信した後にネットワークの問題が発生し、通信が中断された場合、プロデューサーはメッセージがコミットされたかどうかを判断できません。 Kafka はネットワーク障害時に何が起こったのかを判断できませんが、プロデューサーはメッセージがブローカーに正しく送信されたことを確認するために複数回再試行できるため、Kafka は現在少なくとも 1 回は実装しています。

1. 冪等性

1. シナリオ

いわゆるべき等性とは、インターフェースへの複数の呼び出しの結果が単一の呼び出しの結果と同じであることを意味します。プロデューサーは再試行時にメッセージを繰り返し書き込む可能性がありますが、これは Kafka のべき等性機能を使用することで回避できます。

冪等性は条件付きです:

プロデューサーが単一セッション内で損失や重複が発生しないことを保証することのみが可能です。プロデューサーが予期せずクラッシュして再起動した場合、これは保証されません (べき等性の場合、以前の状態情報を取得することは不可能であるため、セッション間で損失や重複をゼロにすることは不可能です)。

冪等性は複数のトピック パーティションにまたがることはできず、単一のパーティション内でのみ冪等性を保証できます。複数のトピック パーティションが関係する場合、その間の状態は同期されません。

Producer でべき等性を使用する例は非常に単純です。 Producer の通常の使用方法とあまり変わりません。以下に示すように、Producer 構成の enable.idempotence を true に設定するだけです。

  1. プロパティ props = new Properties();
  2. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true" );
  3. props.put( "acks" , "all" ); // enable.idempotenceがtrueの場合、ここでのデフォルトはすべて  
  4. props.put( "bootstrap.servers" , "localhost:9092" );
  5. props.put( "key.serializer" "org.apache.kafka.common.serialization.StringSerializer" );
  6. props.put( "value.serializer" "org.apache.kafka.common.serialization.StringSerializer" );
  7.  
  8. KafkaProducer プロデューサー = new KafkaProducer(props);
  9.  
  10. プロデューサー.send(新しいプロデューサーレコード(トピック、 「テスト」 );

2. 事務

1. シナリオ

べき等性は複数のパーティション間では機能しませんが、トランザクションによってこの欠点を補うことができます。トランザクションにより、複数のパーティションへの書き込み操作のアトミック性が保証されます。操作の原子性とは、複数の操作がすべて成功するか、すべて失敗するかのいずれかであり、部分的な成功や部分的な失敗の可能性がないことを意味します。

トランザクションを実装するには、ネットワーク障害によって、クライアント プログラムを通じて設定される一意の transactionalId が提供される必要があります。

コードリポジトリを参照してください:

com.heima.kafka.chapter7.ProducerTransactionSend

  1. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG、トランザクションID);

2. 事前準備

トランザクションでは、プロデューサーが冪等性を有効にする必要があります。したがって、transactional.id パラメータを空でない値に設定してトランザクションを有効にする場合は、ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG も true に設定する必要があります (デフォルト値は true)。明示的に false に設定されている場合、例外がスローされます。

KafkaProducer は、次の 5 つのトランザクション関連のメソッドを提供します。

  1. //transactionalIdが設定されている場合は、トランザクションを初期化します
  2. パブリックvoid initTransactions()
  3. //トランザクションを開始
  4. パブリックボイド beginTransaction()
  5. // トランザクション内でコンシューマーに置換送信操作を提供する
  6. パブリックvoid sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> オフセット、String consumerGroupId)
  7. //トランザクションをコミットする
  8. パブリックボイドコミットトランザクション()
  9. //ロールバックと同様にトランザクションを終了します
  10. パブリックvoid abortTransaction()

3. 事例分析

コードリポジトリを参照してください:

  • com.heima.kafka.chapter7.ProducerTransactionSend

メッセージ送信者

  1. /**
  2. * Kafkaプロデューサートランザクションの使用
  3. */
  4. パブリッククラス ProducerTransactionSend {
  5. 公共 静的最終文字列トピック = "トピックトランザクション" ;
  6. 公共 静的最終文字列 brokerList = "localhost:9092" ;
  7. 公共 静的最終文字列 transactionId = "transactionId" ;
  8.      
  9. 公共 静的void main(String[] args) {
  10. プロパティ properties = new Properties();
  11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG、StringSerializer.class.getName());
  12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG、StringSerializer.class.getName());
  13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG、brokerList);
  14. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG、トランザクションID);
  15.          
  16. KafkaProducer<String, String> プロデューサー = 新しい KafkaProducer<> (プロパティ);
  17.          
  18. プロデューサー.initTransactions();
  19. プロデューサー.beginTransaction();
  20.          
  21. 試す {
  22. //ビジネスロジックを処理して ProducerRecord を作成する
  23. ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1" );
  24. プロデューサー.send(レコード1);
  25. ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2" );
  26. プロデューサー.send(レコード2);
  27. ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3" );
  28. プロデューサー.send(レコード3);
  29. //他のロジックを処理する
  30. プロデューサー.commitTransaction();
  31. } キャッチ (ProducerFencedException e) {
  32. プロデューサー.abortTransaction();
  33. }
  34. プロデューサー.close () ;
  35. }
  36. }

トランザクションロールバックケースのシミュレーション

  1. 試す {
  2. //ビジネスロジックを処理して ProducerRecord を作成する
  3. ProducerRecord<String, String> record1 = new ProducerRecord<>(topic, "msg1" );
  4. プロデューサー.send(レコード1);
  5.      
  6. //トランザクションのロールバックケースをシミュレートする
  7. システム.out.println(1/0 ) ;
  8.      
  9. ProducerRecord<String, String> record2 = new ProducerRecord<>(topic, "msg2" );
  10. プロデューサー.send(レコード2);
  11. ProducerRecord<String, String> record3 = new ProducerRecord<>(topic, "msg3" );
  12. プロデューサー.send(レコード3);
  13. //他のロジックを処理する
  14. プロデューサー.commitTransaction();
  15. } キャッチ (ProducerFencedException e) {
  16. プロデューサー.abortTransaction();
  17. }

上記のケースでは、msg1 が正常に送信された後、異常なトランザクションが発生してロールバックされ、msg1 コンシューマーはメッセージを受信できません。

3. コントローラー

Kafka クラスターには 1 つ以上のブローカーがあり、そのうちの 1 つがコントローラー (Kafka コントローラー) として選出され、クラスター全体のすべてのパーティションとレプリカのステータスを管理する役割を担います。パーティションのリーダー レプリカに障害が発生した場合、コントローラーはパーティションの新しいリーダー レプリカを選択する責任を負います。パーティションの ISR セットで変更が検出されると、コントローラはすべてのブローカーにメタデータ情報を更新するように通知する役割を担います。 kafka-topics.sh スクリプトを使用してトピックのパーティション数を増やす場合でも、パーティションの再割り当てはコントローラーが担当します。

Kafka でのコントローラーの選択は Zookeeper に依存します。コントローラーに対して正常に実行されたブローカーは、Zookeeper に一時的な (EPHEMERAL) ノード /controller を作成します。この一時ノードの内容は次のとおりです。

1. ZooInspector 管理

管理には、Zookeeper グラフィカル クライアント ツール (ZooInspector) によって提供される jar を使用し、次のように起動します。

  1. jarが配置されているディレクトリを見つけます
  2. jarファイルjava -jar zookeeper-dev-ZooInspector.jarを実行します。
  3. Zookeeperへの接続

  1. { "バージョン" :1、 "ブローカーID" :0、 "タイムスタンプ" : "1529210278988" }

現在のバージョンでは、version は 1 に固定され、brokerid はコントローラーとして選出されたブローカーの ID 番号を示し、timestamp はコントローラーの選出が行われたタイムスタンプを示します。

クラスター内には常に 1 つのコントローラーのみが存在します。各ブローカーが起動すると、/controller ノードの brokerid 値を読み取ろうとします。読み取られた brokerid 値が -1 でない場合、別のブローカー ノードがコントローラーに対して正常に実行されたことを意味し、現在のブローカーは選出を放棄します。 Zookeeper に /controller ノードが存在しない場合、またはこのノードのデータが異常な場合は、/controller ノードを作成しようとします。現在のブローカーがノードを作成すると、他のブローカーも同時にこのノードを作成しようとする場合があります。ノードを正常に作成したブローカーのみがコントローラーになり、ノードの作成に失敗したブローカーは選出に失敗したことを意味します。各ブローカーは、現在のコントローラーの brokerid 値をメモリに保存します。これは、activeControllerId として識別できます。

Zookeeper のコントローラーに関連する /controller_epoch ノードもあります。このノードは永続的 (PERSISTENT) ノードであり、整数の controller_epoch 値を格納します。 controller_epoch は、コントローラーが変更された回数を記録するために使用されます。つまり、現在のコントローラーの世代を記録するために使用され、「コントローラー エポック」とも呼ばれます。

controller_epoch の初期値は 1 です。つまり、クラスター内の最初のコントローラーのエポックは 1 です。コントローラーが変更されると、新しいコントローラーが選択されなければ、このフィールドの値は 1 増加します。コントローラーと対話する各リクエストには、controller_epoch フィールドが含まれます。リクエストの controller_epoch 値がメモリ内の controller_epoch 値より小さい場合、リクエストは期限切れのコントローラに送信されたとみなされ、リクエストは無効とみなされます。要求された controller_epoch 値がメモリ内の controller_epoch 値より大きい場合、新しいコントローラが選択されたことを意味します。 Kafka は controller_epoch を使用してコントローラーの一意性を保証し、それによって関連する操作の一貫性を確保していることがわかります。

コントローラー ステータスを持つブローカーには、他の通常のブローカーよりも 1 つ多くの責任があります。詳細は以下の通りです。

  1. パーティション関連の変更を監視します。
  2. トピック関連の変更を監視します。
  3. ブローカー関連の変更を監視します。
  4. Zookeeper からトピック、パーティション、ブローカーに関連するすべての最新情報を読み取り、それに応じて管理します。

4. 信頼性の保証

  1. 信頼性保証: システムがさまざまな環境で一貫して動作することを保証する
  2. カフカの保証
  3. パーティション化されたメッセージの順序を保証する 同じプロデューサーを使用して同じパーティションにメッセージを書き込む場合、メッセージ B がメッセージ A の後に書き込まれると、Kafka はメッセージ B のオフセットがメッセージ A のオフセットよりも大きいことを保証し、コンシューマーはメッセージ B を読み取る前にメッセージ A を読み取ります。
  4. メッセージは、パーティション (ファイル システム キャッシュ) のすべての同期レプリカに書き込まれた場合にのみコミットされたと見なされます。
  5. プロデューサーは、パラメータacksを制御して、異なるタイプの確認を受け取るように選択できます。
  6. アクティブなレプリカがある限り、コミットされたメッセージは失われません。
  7. コンシューマーはコミットされたメッセージのみを読み取ることができます

1. 期限切れのコピー

パーティションに同期失敗状態のレプリカがあるかどうかを確認する方法を教えてください。 Kafka バージョン 0.9.x 以降では、単一のパラメータ replica.lag.time.max.ms によって制御されます (デフォルトのサイズは 10,000)。 ISR 内のフォロワー レプリカが、パラメーター replica.lag.time.max.ms で指定された値を超えてリーダー レプリカより遅れている場合、レプリカは無効とみなされ、このフォロワー レプリカを ISR から削除する必要があります。具体的な実装原理は非常にシンプルです。フォロワーコピーがリーダーコピーの LEO (Log End Offset、各パーティションの最後のメッセージの位置) より前のすべてのログを同期すると、フォロワーコピーがリーダーコピーに追いついたとみなされ、この時点でコピーの lastCaughtUpTimeMs フラグが更新されます。 Kafka の ReplicaManager が起動すると、レプリカの有効期限検出のためのスケジュールされたタスクが開始されます。このスケジュールされたタスクは、現在の時刻とレプリカの lastCaughtUpTimeMs の差が、パラメーター replica.lag.time.max.ms で指定された値より大きいかどうかを定期的にチェックします。フォロワー コピーがリーダー コピーからデータを取得する限り、lastCaughtUpTimeMs を更新すると誤解しないでください。リーダー コピーのメッセージ流入率がフォロワー コピーのプル率よりも大きい場合、フォロワー コピーはリーダー コピーからメッセージをプルし続け、リーダー コピーと同期できないとします。このフォロワー コピーが ISR に配置されている場合、リーダー コピーに障害が発生し、このフォロワー コピーが新しいリーダー コピーとして選択されると、重大なメッセージ損失が発生します。

2. コピー

Kafka の各トピック パーティションは n 回複製されます。ここで、n はトピックのレプリケーション係数です。これにより、クラスター サーバーに障害が発生した場合に Kafka がこれらのレプリカに自動的に切り替えられるため、障害発生時にもメッセージが引き続き利用可能になります。 Kafka のレプリケーションはパーティションの粒度に基づいており、パーティションの先行書き込みログは n 台のサーバーに複製されます。 n 個のレプリカのうち、1 つのレプリカがリーダーとして機能し、他のレプリカはフォロワーになります。名前が示すように、プロデューサーはリーダー パーティションにのみデータを書き込むことができ (読み取りはリーダー パーティションからのみ実行可能)、フォロワーはリーダーからログを順番にコピーすることのみが可能です。

レプリカがリーダーと同期しなくなる理由はいくつかあります。フォロワーが一定期間内にリーダーに追いつくことができない場合などです。最も一般的な理由の 1 つは、I/O ボトルネックにより、フォロワーがリーダーからプルするよりも遅い速度でレプリケーション メッセージを追加することです。スタックしたレプリカ: フォロワーは一定期間、リーダーからのリクエストのプルを停止します。フォロワー レプリカは、GC の一時停止、フォロワーの障害または停止により停止しています。

新しく開始されたレプリカ: ユーザーがトピックのレプリケーション係数を増やすと、新しいフォロワーは、リーダー ログに完全に追いつくまで、同期レプリカのリストに表示されません。

レプリカが遅れているかどうかを判断する方法:

  1. レプリカ.ラグ。最大メッセージ数= 4

サーバー側で設定する必要があるパラメータは、replica.lag.time.max.ms の 1 つだけです。このパラメータは、レプリカがパーティション リーダーに応答するまでの最大待機時間を説明します。スタックまたは失敗したレプリカをプローブします - レプリカが失敗し、プル リクエストの送信間隔が replica.lag.time.max.ms を超える場合。 Kafka はこのレプリカが無効であると見なし、同期されたレプリカのリストから削除します。遅いレプリカを検出するメカニズムが変更されました - レプリカがリーダーより replica.lag.time.max.ms 以上遅れ始めた場合。 Kafka はそれを遅すぎると判断し、同期レプリカのリストから削除します。リーダーへのレプリカ要求の間隔が replica.lag.time.max.ms より大きくない場合、リーダーはトラフィックの急増を引き起こし、メッセージを大きなバッチで書き込みます。 Kafka は同期中のレプリカのリストからレプリカを削除しません。

リーダーエポックリファレンス

データ損失のシナリオ

データの不整合のシナリオ

カフカ 0.11.0.0。バージョン ソリューション

上記の 2 つの問題の根本的な原因は、HW 値がレプリカ バックアップの成功を測定するために使用され、障害発生時にログを切り捨てる基準として使用されていることです。ただし、HW 値の更新は非同期的に遅延され、特に更新するには追加の FETCH 要求処理プロセスが必要になります。したがって、途中でクラッシュが発生すると、HW 値が期限切れになる可能性があります。これらの理由から、Kafka 0.11 では HW 値の代わりにリーダー エポックが導入されました。リーダー側は、リーダーのエポック情報を保存するための追加のメモリ領域を割り当てるため、上記 2 つのシナリオが発生した場合でも、これらの問題を十分に回避できます。

いわゆるリーダー エポックは、実際には値のペア (エポック、オフセット) です。エポックは、0 から始まるリーダーのバージョン番号を表します。リーダーが 1 回変更されると、エポックは +1 になり、オフセットは、このエポック バージョンのリーダーによって書き込まれた最初のメッセージの変位に対応します。次のような 2 つの値のペアがあるとします。

  • (0, 0)
  • (1、120)

これは、最初のリーダーがオフセット 0 からメッセージの書き込みを開始することを意味します。合計120件のメッセージ[0, 119]が書き込まれます。 2 番目のリーダーのバージョン番号は 1 で、オフセット 120 からメッセージの書き込みを開始します。

このようなキャッシュはリーダー ブローカーに保存され、定期的にチェックポイント ファイルに書き込まれます。

データ損失を回避する:

データの不整合を回避する

6. メッセージの重複に関するシナリオと解決策

1. プロデューサー側の重複

プロデューサーによって送信されたメッセージは正しいブレーク応答を受信できなかったため、プロデューサーは再試行します。

プロデューサーがメッセージを送信し、メッセージがディスクに書き込まれた後、送信者はネットワークなどのさまざまな理由により失敗またはネットワーク中断の応答を受信し、その後プロデューサーは回復可能な例外再試行メッセージを受信し、その結果、メッセージが重複します。

解決:

  • Kafkaのべき等性を開始する

Kafka のべき等性を有効にするために、コードを変更する必要はありません。デフォルトでは無効になっています。設定ファイルを変更する必要があります: enable.idempotence=true、require ack=all、retries>1。

  • ack=0、再試行なし。

メッセージが失われる可能性があります。これは、ログ収集など、データ損失よりもスループットが重要な状況に適しています。

消費者側の重複

根本的な原因

データが消費された後、オフセットがブローカーに時間内に送信されません。

解決

自動コミットをキャンセルする

消費が完了するたび、またはプログラムが終了するたびに手動で送信します。重複が保証されるわけではありません。

下流冪等性

一般的な解決策は、ダウンストリームをべき等にするか、消費される各メッセージのオフセットを記録することです。いくつかの厳密なシナリオでは、正確な更新を保証するために、注文 ID や下流ステータス更新などのオフセットまたは一意の ID を同じデータベース トランザクションに配置するか、消費オフセットを同時に下流データ テーブルに記録し、その後、下流データを更新するときに消費サイトを使用して楽観的ロックを実行して、古いサイトでのデータ更新を拒否する必要がある場合があります。

7. __​​コンシューマーオフセット

_consumer_offsets は、ユーザーに対して透過的な内部トピックです。データ ファイルとログに時々表示される以外、ユーザーは一般にこのトピックを認識していません。しかし、新しいバージョンの Kafka コンシューマーの変位情報が格納されていることはわかっています。

1. いつ作成されましたか?

一般に、トピック __consumer_offsets は、最初のコンシューマーがクラスター内のメッセージを消費するときに自動的に作成されます。パーティションの数は、offsets.topic.num.partitions パラメータで設定できます。デフォルト値は、以下に示すように 50 です。

2. パーティションの解析

コードリポジトリを参照してください:

  1. com.heima.kafka.chapter7.消費者オフセット分析

すべてのパーティションを取得します。

要約する

この章では、主に冪等性やトランザクション処理などの Kafka 関連の安定性操作について説明します。また、信頼性と一貫性の保証、メッセージの重複と解決策についても説明します。

<<:  Kube-vip を使用して高可用性の Kubernetes クラスターを構築する (フル バージョン)

>>:  アンチャンがKCSP資格を取得し、クラウドネイティブの強みが再び国際的に認められる

推薦する

fatcow-無制限のウェブサイトホスティング/無料ドメイン名/85% オフ

fatcow からメールでプロモーション情報を受け取りました。無制限の Web サイト構築と仮想ホス...

検索エンジンによってウェブサイトが「誤って破壊される」問題を解決する方法について説明します

多くのウェブマスターは、ウェブサイトのランキングが突然消える状況に遭遇しています。この場合、ウェブマ...

Liudurenhe(EC)の創設者、張興良氏:起業家はSaaSイノベーションを行う際に、この3つの点に注意する必要があります。

2017年11月30日、創業邦主催の中国企業サービスサミットが北京国家会議センターで盛大に開催されま...

#11.11# 馬華クラウド: 香港クラウドサーバー 1.9% オフ、月額 30 元、2G メモリ/2 コア/40gSSD/2M 帯域幅/cn2 gia ネットワーク

馬華クラウドの毎年恒例の「双十一」イベントも始まりました。香港クラウドCN2回線と安徽モバイルバック...

クラウドネイティブデータミドルプラットフォームの技術とトレンドの解説

データミドルプラットフォームの開発は、一般的に、データベース、データウェアハウス、ビッグデータプラッ...

Azure Kubernetes 構築シナリオ アプリケーションに関するワンストップ ディスカッション

私たちの業界は、Docker、Docker Compose、Kubernetes などのテクノロジー...

クラウドデータベースの選択では「CAP定理」は避けられない?

1980 年代初頭と比較すると、今日のデータベース技術は大きく進歩しました。ハードウェアの選択に関し...

レポート: モバイル トラフィックの増加により Google 有料クリック数が減少

北京時間10月22日、Adobeは最新の第3四半期インターネット広告データレポートで、モバイルトラフ...

2021年春節紅包合戦を通して中国のAIインフラ構築を見る

2015年以来、紅包は毎年の春節の美しい一面となっています。アリババを代表として、テンセント、百度、...

t667: 三網cn2 giaネットワーク、安昌コンピュータルームKVMシリーズVPSの20%割引、毎月3回のIP変更無料

t667 (olvps) は、米国ロサンゼルスの安昌コンピュータルームで 200Mbps の帯域幅を...

ソニーPS4の中国版はリージョンロックされていないとユーザーから報告

ソニーのPS4の中国での発売日が近づくにつれ、このデバイスがリージョンロックされていないというニュー...

Cloudive 簡単レビュー - [メモリ2g/月額7ドル]

Gongyi は、シカゴにデータセンターがある Cloudive から特別版の 2G メモリ KVM...

ハイブリッドクラウド: パブリッククラウドとプライベートクラウドのバランスをとる方法

クラウドでのデータの保存と処理にかかるコストと効率性への注目が高まるにつれ、多くの企業が業務をクラウ...

Open NOS によるクラウドとデータセンターの再構成の習得

企業は、データセンターの運用コストを削減しながら、生産性、ビジネスの回復力、持続可能性を向上させるこ...

これはすべて本当ですか?クアッドコア HD4870 X4 シングルカードイメージ

少し前に、私たちはネットユーザーに多くの「偽の」グラフィック カードを公開しました。 PS マスター...