1. 概要 最近、カフカの消費とハートビートのメカニズムについて相談する学生もいます。今日は、このブログを通じて、これらの内容を一つずつ紹介していきたいと思います。 2. コンテンツ 2.1 Kafka の消費 まず、消費について見てみましょう。 Kafka は非常にシンプルな消費 API を提供します。ユーザーは、Kafka ブローカー サーバーのアドレスを初期化し、KafkaConsumer クラスをインスタンス化してトピック内のデータを取得するだけです。単純な Kafka 消費サンプルコードは次のとおりです。 - パブリッククラス JConsumerSubscribe は Thread を拡張します {
- 公共 静的void main(String[] args) { JConsumerSubscribe jconsumer = new JConsumerSubscribe(); jconsumer を起動します。 } /** Kafka クラスター情報を初期化します。 */ プライベートプロパティ構成() { Properties props = new Properties(); props.put( "bootstrap.servers" , "dn1:9092,dn2:9092,dn3:9092" );// Kafka クラスターのアドレスを指定します
- props.put( "group.id" , "ke" ); //消費者グループを指定する
- props.put( "enable.auto.commit" , "true" ); // 自動送信を有効にする
- props.put( "auto.commit.interval.ms" , "1000" ); // 自動送信の時間間隔
- // メッセージの主キーをデシリアライズします。props.put( "key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
- // 消費レコードをデシリアライズする props.put( "value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" );
- プロパティを返します。
- } /** シングルスレッドのコンシューマーを実装します。 */ @Override public void run() { // コンシューマーインスタンスオブジェクトを作成します。KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configure()); // 消費トピックコレクションをサブスクライブします。consumer.subscribe(Arrays.asList( "test_kafka_topic" ));
- // リアルタイム消費フラグ boolean flag = true ;
- while (フラグ) {
- // トピックメッセージデータを取得します。 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
- ( ConsumerRecord<String, String> レコード: レコード)
- // メッセージ レコードの印刷をループします。System. out .printf( "オフセット = %d、キー = %s、値 = %s%n" 、record.offset()、record.key ( )、record.value());
- } // 例外が発生した場合は、コンシューマー オブジェクトを閉じます。consumer.close ();
- }}
上記のコードを使用すると、トピック内のデータを簡単に取得できます。ただし、データを取得するために poll メソッドを呼び出すと、Kafka Broker Server がそれらの処理を実行します。次に、ソースコードの実装の詳細を見てみましょう。コアコードは次のとおりです。 org.apache.kafka.clients.consumer.KafkaConsumer - プライベート ConsumerRecords<K, V> ポーリング(最終ロングタイムアウトMs、最終ブール値 includeMetadataInTimeout) {
- 取得して確実に開く();試す {
- if (timeoutMs < 0) throw new IllegalArgumentException( "タイムアウトは負の値であってはなりません" );
- if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
- throw new IllegalStateException( "コンシューマーはトピックにサブスクライブされていないか、パーティションが割り当てられていません" );
- } //タイムアウトが経過するまで新しいデータをポーリングします
- 長い経過時間 = 0L;
- する {
- クライアントがトリガーウェイクアップを実行する可能性があります。最終的な長いメタデータ終了;メタデータタイムアウトを含める場合
- 最終的な長いmetadataStart = time .milliseconds(); if (!updateAssignmentMetadataIfNeeded(remainingTimeAtLeastZero(timeoutMs, elapsedTime))) {
- ConsumerRecords.empty()を返します。
- } metadataEnd = time .milliseconds();経過時間 += メタデータ終了 - メタデータ開始; }それ以外{
- (!updateAssignmentMetadataIfNeeded(Long.MAX_VALUE)) の間 {
- log.warn( "まだメタデータを待機しています" );
- } metadataEnd = time .milliseconds(); } 最終的な Map<TopicPartition、List<ConsumerRecord<K、V>>> records = pollForFetches(remainingTimeAtLeastZero(timeoutMs、elapsedTime));レコードが空の場合
- // 取得したレコードを返す前に、次の一連の取得を送信できます
- //ユーザーが応答を待つのをブロックしないようにして、パイプラインを有効にします
- //取得したレコードを処理します。
- //
- // 注意: 消費された位置はすでに更新されているため、
- // ウェイクアップまたは 事前に発生するその他のエラー 取得したレコードを返します。
- if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
- クライアントのポーリングNoWakeup(); } this.interceptors.onConsume(new ConsumerRecords<>(records))を返します。
- } 最終的な長い fetchEnd = time .milliseconds();経過時間 += フェッチ終了 - メタデータ終了; } 経過時間 < タイムアウト時間の場合;
- ConsumerRecords.empty()を返します。
- ついに
- リリース(); } }
上記のコードには pollForFetches メソッドがあり、その実装ロジックは次のとおりです。 - プライベート Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(final long timeoutMs) {
- 最終的な長いstartMs = time .milliseconds();
- long pollTimeout = Math.分(コーディネーター.timeToNextPoll(startMs), timeoutMs);
- // すでにデータが利用可能な場合は、すぐに返します
- 最終的な Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
- レコードが空の場合
- レコードを返します。
- }
- // 新しいフェッチを送信します(保留中のフェッチは再送信しません)
- フェッチャー.sendFetches();
- //ポジションが不足している場合、ポーリングでブロックされたままになるのは望ましくありません
- // オフセット検索が遅れている可能性があるため 失敗の後
- // 注意: cachedSubscriptionHashAllFetchPositionsを使用する場合は、
- // このメソッドの前に updateAssignmentMetadataIfNeeded を実行します。
- if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
- ポーリングタイムアウト = retryBackoffMs;
- }
- client.poll(pollTimeout, startMs, () -> {
- //フェッチはバックグラウンドスレッドによって完了する可能性があるため、このポーリング条件が必要です
- // poll()で不必要にブロックしないようにするため
- !fetcher.hasCompletedFetches()を返します。
- });
- //長いポーリングの後、グループの再バランス調整が必要かどうかを確認する必要があります
- //前 データを返してグループがより早く安定できるようにする
- コーディネーターの再参加が必要または保留中の場合(){
- Collections.emptyMap()を返します。
- }
- fetcher.fetchedRecords()を返します。
- }
上記のコードの太字部分から、コンシューマー クライアントがデータを取得するたびに、最初に poll メソッドを通じてフェッチャーの fetchedRecords 関数を呼び出すことがわかります。データが取得されない場合、新しい sendFetches 要求が開始されます。データを消費する場合、Kafka ブローカー サーバーから取得されるデータのバッチごとに最大データ量の制限があります。デフォルトは 500 レコードで、プロパティ (max.poll.records) によって制御されます。クライアントでこのプロパティ値を設定すると、消費するたびに取得されるデータの量を調整できます。 ヒント:ここで注意すべき点は、max.poll.records はポーリング要求の合計データを返すものであり、パーティションの数とは関係がないということです。したがって、各消費に対してすべてのパーティションから取得されるトピック データの合計数は、max.poll.records で設定された値を超えることはありません。 Fetcher クラスでは、sendFetches メソッドで取得されるデータの容量に制限があり、これはプロパティ (max.partition.fetch.bytes) によって設定され、デフォルトは 1 MB です。 max.partition.fetch.bytes 制限に達したときに、10,000 件のレコード (デフォルトでは 1 回につき 500 件) をフェッチする必要がある場合、今回はネットワーク経由で開始されたすべての要求のフェッチを完了するために 20 回実行する必要があるというシナリオが考えられます。 ここで、質問がある学生もいるかもしれません。デフォルトの max.poll.records プロパティ値を 10000 に調整することはできないでしょうか?はい、調整できますが、一緒に使用する必要がある別のプロパティがあります。これは各ポーリングのタイムアウトです(Duration.ofMillis(100))。ここでは、各データの実際の容量に基づいてタイムアウト設定を決定する必要があります。最大値を 10000 に調整すると、各レコードの容量が大きい場合、タイムアウトは依然として 100 ミリ秒のままとなり、取得されるデータは 10,000 未満になる可能性があります。 ここで注意が必要なもう 1 つの点は、セッション タイムアウトの問題です。 session.timeout.ms のデフォルト値は 10 秒、group.min.session.timeout.ms のデフォルト値は 6 秒、group.max.session.timeout.ms のデフォルト値は 30 分です。消費のビジネス ロジックを処理しているときに、10 秒以内に処理されない場合、コンシューマー クライアントは Kafka ブローカー サーバーから切断され、消費されたデータと生成されたオフセットは Kafka に送信されません。これは、Kafka ブローカー サーバーがコンシューマー プログラムが切断されたと認識するためです。 auto-commit プロパティまたは auto.offset.reset プロパティを設定した場合でも、消費時に重複した消費が発生します。これは、session.timeout.ms タイムアウトが発生するためです。 2.2 ハートビートのメカニズム 上記の最後に、セッション タイムアウトによってメッセージの消費が繰り返されるという説明がありました。タイムアウトが発生したのはなぜですか?学生の中には、このような疑問を持つ人もいるかもしれません。私のコンシューマー スレッドは明らかに開始されており、終了していません。 Kafka のメッセージを消費できないのはなぜですか?コンシューマー グループが ConsumerGroupID を見つけることができません。これはタイムアウトが原因である可能性があります。 Kafka はハートビート メカニズムを通じてタイムアウトを制御します。ハートビート メカニズムは、コンシューマー クライアントには認識されません。非同期スレッドです。コンシューマー インスタンスを開始すると、ハートビート スレッドが動作を開始します。 定期的にハートビートを送信し、コンシューマーのステータスを検出するために、org.apache.kafka.clients.consumer.internals.AbstractCoordinator で HeartbeatThread スレッドが開始されます。各コンシューマーには org.apache.kafka.clients.consumer.internals.ConsumerCoordinator があり、各 ConsumerCoordinator はハートビートを維持するために HeartbeatThread スレッドを開始します。ハートビート情報は、org.apache.kafka.clients.consumer.internals.Heartbeat に保存されます。宣言されたスキーマは次のとおりです。 - プライベート最終intセッションタイムアウトMs;
- プライベート最終intハートビート間隔Ms;
- プライベート最終int maxPollIntervalMs;
- プライベート最終長い retryBackoffMs;
- プライベート揮発性の長い最後のハートビート送信;
- プライベートロングラストハートビート受信;
- プライベート長いlastSessionReset;
- プライベートな長い最後の投票;
- プライベートブールハートビート失敗;
ハートビート スレッドの run メソッドの実装コードは次のとおりです。 - パブリックボイド実行(){
- 試す {
- log.debug( "ハートビートスレッドが開始されました" );
- (真)の間{
- 同期された (AbstractCoordinator.this) {
- (閉じている)
- 戻る;
- 有効になっている場合
- 抽象コーディネータ。this.wait();
- 続く;
- } 状態が MemberState.STABLE の場合 {
- //グループ は 安定していない(おそらくグループを離れたため) またはコーディネーターが
- //追い出されました) ので、ハートビートを無効にしてメインスレッドが再参加するのを待ちます。
- 無効にする();
- 続く;
- }
- クライアントのポーリングNoWakeup();
- long now = time .milliseconds();
- コーディネーターが不明の場合(){
- (findCoordinatorFuture != null || lookupCoordinator().failed()) の場合
- //直近の未来のチェックは、
- // ブローカーは利用可能 接続する に。
- AbstractCoordinator.this.wait(retryBackoffMs);
- }そうでない場合 (heartbeat.sessionTimeoutExpired(now)) {
- // ハートビートが成功しないままセッションタイムアウトが経過したので、
- // おそらくコーディネータがまだ正常であることを確認します。
- コーディネーター不明をマークします();
- }そうでない場合 (heartbeat.pollTimeoutExpired(now)) {
- // ポーリングタイムアウトが期限切れになったため、フォアグラウンドスレッドが停止したことを意味します
- //で poll()の呼び出しの間にグループを明示的に離脱します。
- グループを離れるかもしれない();
- }そうでない場合 (!heartbeat.shouldHeartbeat(now)) {
- // 再試行バックオフを待ってから再度ポーリングする 心拍が停止した場合や
- // コーディネータが切断されました
- AbstractCoordinator.this.wait(retryBackoffMs);
- }それ以外{
- heartbeat.sentHeartbeat(現在)
- sendHeartbeatRequest().addListener(新しいRequestFutureListener<Void>() {
- @オーバーライド
- パブリックvoid onSuccess(Void値) {
- 同期された (AbstractCoordinator.this) {
- heartbeat.receiveHeartbeat(時間.milliseconds());
- }
- }
- @オーバーライド
- パブリックvoid onFailure(RuntimeException e) {
- 同期された (AbstractCoordinator.this) {
- if (e インスタンス RebalanceInProgressException) {
- //有効です グループが鼓動を続ける間 バランス調整中です。
- // コーディネータがメンバーをグループ内に保持することを保証する のために 限り
- //再バランスのタイムアウトの期間として。心拍の送信を止めれば、
- // ただし、再参加する前にセッション タイムアウトが期限切れになる可能性があります。
- heartbeat.receiveHeartbeat(時間.milliseconds());
- }それ以外{
- ハートビート.failHeartbeat();
- //スレッドがスリープ状態の場合は、スレッドを起動してハートビートを再スケジュールします
- 抽象コーディネータ。this.notify();
- }
- }
- }
- });
- }
- }
- }
- } キャッチ (認証例外 e) {
- log.error( "ハートビートスレッドで認証エラーが発生しました" , e);
- this.failed.set (e);
- } キャッチ (GroupAuthorizationException e) {
- log.error( "ハートビートスレッドでグループ認証エラーが発生しました" , e);
- this.failed.set (e);
- } キャッチ (InterruptedException | InterruptException e) {
- スレッドが中断されました();
- log.error( "ハートビートスレッドで予期しない割り込みを受信しました" , e);
- this.failed.set (新しい RuntimeException(e));
- } キャッチ (Throwable e) {
- log.error( "予期しないエラーのためハートビートスレッドが失敗しました" , e);
- if (e インスタンスの RuntimeException)
- this.failed.set ( (RuntimeException) e);
- それ以外
- this.failed.set (新しい RuntimeException(e));
- ついに
- log.debug( "ハートビートスレッドが閉じられました" );
- }
- }
- コードを表示
ハートビート スレッドには、sessionTimeoutExpired と pollTimeoutExpired という 2 つの最も重要なタイムアウト関数があります。 - パブリックブール値 sessionTimeoutExpired(long now) {
- 今すぐ戻る- 数学。最大(lastSessionReset、lastHeartbeatReceive) > sessionTimeoutMs;
- } public boolean pollTimeoutExpired(long now) {
- 今戻ります- lastPoll > maxPollIntervalMs;
- }
2.2.1 セッションタイムアウト期限切れ sessionTimeout がタイムアウトすると、現在のコーディネーターが切断を処理しているとマークされます。このとき、コンシューマーは削除され、パーティションとコンシューマー間の対応が再割り当てされます。 Kafka ブローカー サーバーでは、コンシューマー グループは 5 つの状態 (Unknown が含まれる場合は 6 つ) を定義します (org.apache.kafka.common.ConsumerGroupState、次の図を参照)。 2.2.2 ポーリングタイムアウト期限切れ ポーリング タイムアウトがトリガーされると、コンシューマー クライアントは ConsumerGroup を終了します。再度ポーリングすると、ConsumerGroup に再参加し、RebalanceGroup がトリガーされます。 KafkaConsumer クライアントは繰り返しポーリングするのに役立たないため、実装された消費ロジックでポーリング メソッドを継続的に呼び出す必要があります。 3. パーティションとコンシューマスレッド コンシューマ パーティションとコンシューマ スレッドの対応に関しては、理論的にはコンシューマ スレッドの数はパーティションの数以下である必要があります。以前は、1 つのコンシューマ スレッドが 1 つのパーティションに対応し、コンシューマ スレッドの数がパーティションの数と等しい場合に、スレッドの使用率が最大化されるという見解がありました。 KafkaConsumer Client インスタンスを直接使用しても問題はありません。ただし、十分な CPU があれば、実際にはパーティション数よりも多くのスレッドを使用して消費容量を向上させることができます。これには、KafkaConsumer クライアントインスタンスを変更し、消費戦略の事前計算を実装し、追加の CPU を使用してより多くのスレッドを開始し、消費タスクのシャーディングを実装する必要があります。具体的な実装については次のブログで取り上げます。そこでは、「Kafka に基づく分散クエリ SQL エンジン」を紹介します。 4. 結論 このブログはここであなたと共有されます。研究や学習の過程で質問がある場合は、グループに参加して話し合うか、私にメールを送ってください。できる限りお答えいたします。お互いに励まし合いましょう! |