1. はじめに前回の記事では、Kafka のアーキテクチャモデルについて詳しく紹介しました。クラスター環境では、Kafka はパーティションの数を設定することでデータの消費を高速化できます。 理論を知っているだけでは十分ではなく、実際に実践する必要があります。 次回は、本番環境の実例とSpringBoot技術フレームワークをもとに、Kafkaの使い方と高いデータスループットを実現する方法を紹介します! 2. 手順の練習最近、同社のビッグデータチームは、顧客の注文データを毎朝計算し、パフォーマンスデータを私たちにプッシュして、営業スタッフが毎日昨日のパフォーマンスデータを確認できるようにしています。データ量は約1000万です。以下は私のドッキングプロセスです。 2.1. kafka依存パッケージを追加するこのプロジェクトの SpringBoot バージョンは 2.1.5.RELEASE で、依存する kafka バージョンは 2.2.6.RELEASE です。 https : //back-media..com/editor ? id = 707646 / h6e90be6-7 EV6kJbV 2.2. Kafka設定変数を追加する依存パッケージを追加した後は、application.properties に kafka 構成変数を追加するだけで、基本的に通常どおり使用できます。 #Kafka サーバーのアドレスを指定します。クラスターには、カンマで区切られた複数のアドレスを含めることができます。 spring .kafka .bootstrap - サーバー= 197.168.25.196 : 9092 #再試行回数 spring .kafka .producer .retries = 3 #一括送信するメッセージの数 spring .kafka .producer .batch -サイズ= 1000 #32MB バッチバッファ spring .kafka .producer .buffer -メモリ= 33554432 #デフォルトのコンシューマ グループ spring .kafka .consumer .group - id = crm -マイクロサービス- newperformance #最も古い未使用オフセット spring .kafka .consumer .auto -オフセット-リセット=最も早い #一度に取得できるデータの最大量 spring .kafka .consumer .max -ポーリング-レコード= 4000 #自動的に送信するかどうか spring .kafka .consumer .enable - 自動コミット= true #自動送信時間間隔、単位 ms spring .kafka .consumer .auto -コミット-間隔= 1000 2.3.消費者を作成する @成分 パブリッククラス BigDataTopicListener {
プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;
@KafkaListener (トピック= { "big_data_topic" } ) パブリック void コンシューマー( ConsumerRecord < ? 、 ? >コンシューマー レコード) { log.info ( "bigData によってプッシュされたデータ '{}' を受信しました" 、 consumerRecord.toString ( ) ) ; // ... // db .save ( consumerRecord ) ; //データを挿入または更新する }
} 2.4.相手をシミュレートしてデータテストをプッシュする @RunWith ( SpringRunner.クラス) @SpringBootテスト パブリッククラス KafkaProducerTest {
オートワイヤード private KafkaTemplate < String , String > kafkaTemplate ;
@テスト パブリックボイドテスト送信( ) { ( int i = 0 ; i < 5000 ; i ++ ) { Map <文字列、オブジェクト> map = new LinkedHashMap <> ( ) ; マップ.put ( "datekey" , 20210610 ) ; map .put ( "userid" 、 i ) ; map .put ( "salaryAmount" 、 i ) ; // Kafka の big_data_topic トピックにデータをプッシュします kafkaTemplate .send ( "big_data_topic" 、 JSONObject .toJSONString ( map ) ) ; } } } 最初は、この単一のデータ消費方法を使用してプログラムをテストしても何も問題はありませんでした。 しかし、運用開始後に大きな問題が発覚しました。1,000 万個のデータを処理するのに少なくとも 3 時間かかり、データ ダッシュボードにデータが不足する結果となりました。 翌日、私は自分の失敗から学び、バッチ消費モデルに切り替えることにしました。どうやってやるんですか?以下をご覧ください! 2.5. Kafka の消費モードをバッチ消費に変更するまず、次の内容の KafkaConfiguration 構成クラスを作成します。 @構成 パブリッククラス KafkaConfiguration {
@値( "${spring.kafka.bootstrap-servers}" ) プライベート文字列 bootstrapServers ;
@値( "${spring.kafka.producer.retries}" ) プライベート整数再試行;
@値( "${spring.kafka.producer.batch-size}" ) プライベート整数バッチサイズ;
@Value ( "${spring.kafka.producer.buffer-memory}" ) プライベート整数バッファメモリ;
@値( "${spring.kafka.consumer.group-id}" ) プライベート文字列グループID ;
@Value ( "${spring.kafka.consumer.auto-offset-reset}" ) プライベート文字列 autoOffsetReset ;
@Value ( "${spring.kafka.consumer.max-poll-records}" ) プライベート整数maxPollRecords ;
@値( "${spring.kafka.consumer.batch.concurrency}" ) プライベートInteger batchConcurrency ;
@値( "${spring.kafka.consumer.auto-commit}" ) プライベートブール値autoCommit ;
@値( "${spring.kafka.consumer.auto-commit-interval}" ) プライベート整数autoCommitInterval ;
@ビーン パブリックマップ<文字列、オブジェクト>プロデューサー構成( ) { Map < String , Object > props = new HashMap <> ( ) ; props .put ( ProducerConfig .ACKS_CONFIG 、 "0" ) ; props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG 、 bootstrapServers ) ; props .put ( ProducerConfig .RETRIES_CONFIG 、再試行) ; props .put ( ProducerConfig .BATCH_SIZE_CONFIG 、 batchSize ) ; props .put ( ProducerConfig .LINGER_MS_CONFIG 、 1 ) ; props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG 、 bufferMemory ) ; props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG 、 StringSerializer .class ) ; props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG 、 StringSerializer .class ) ; プロパティを返します。 }
@ビーン パブリックプロデューサーファクトリー<文字列,文字列>プロデューサーファクトリー( ) { 新しい DefaultKafkaProducerFactory <> ( producerConfigs ( ) )を返します。 }
@ビーン パブリック KafkaTemplate <文字列,文字列> kafkaTemplate ( ) { 新しい KafkaTemplate <> ( producerFactory ( ) )を返します。 }
@ビーン パブリックマップ<文字列、オブジェクト> consumerConfigs ( ) { Map < String , Object > props = new HashMap <> ( ) ; props .put ( ConsumerConfig .GROUP_ID_CONFIG 、 groupId ) ; props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG 、 autoOffsetReset ) ; props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG 、 bootstrapServers ) ; props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG 、 maxPollRecords ) ; props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG 、 autoCommit ) ; props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG 、 30000 ) ; props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG 、 30000 ) ; props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG 、 StringDeserializer .class ) ; props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG 、 StringDeserializer .class ) ; プロパティを返します。 }
@ビーン パブリック KafkaListenerContainerFactory < ? >バッチファクトリー( ) { ConcurrentKafkaListenerContainerFactory < Integer 、 String >ファクトリー= new ConcurrentKafkaListenerContainerFactory <> ( ) ; ファクトリ.setConsumerFactory (新しい DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ; //同時実行数をトピックパーティションの数以下に設定します ファクトリ.setConcurrency ( batchConcurrency ) ; ファクトリ.getContainerProperties ( ) . setPollTimeout ( 1500 ) ; ファクトリ.getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ; //バッチ消費に設定され、各バッチの数は Kafka 構成パラメータConsumerConfig.MAX_POLL_RECORDS_CONFIGで設定されます ファクトリ.setBatchListener ( true ) ; 工場を返却します。 }
} 同時に、同時実行数を設定するための新しい spring.kafka.consumer.batch.concurrency 変数が追加されました。このパラメータを通じて、消費を実現するために複数のスレッドを指定できます。 application.properties 構成ファイルに、次の変数を追加します。 #バッチ消費の同時実行数、トピックパーティションの数以下 spring .kafka .consumer .batch .concurrency = 3
#バッチプルの最大数を4000に設定する spring .kafka .consumer .max -ポーリング-レコード= 4000
#自動送信をfalseに設定する spring .kafka .consumer .enable - 自動コミット= false 最後に、単一消費方式をバッチ消費方式モードに変更します。 @成分 パブリッククラス BigDataTopicListener {
プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;
@KafkaListener (トピック= { "big_data_topic" } 、コンテナファクトリー= "batchFactory" ) パブリック void batchConsumer (リスト< ConsumerRecord < ? 、 ? >> consumerRecords 、確認応答 ack ) { 長い開始= System .currentTimeMillis ( ) ;
// ... // db .batchSave ( consumerRecords ) ; //データの一括挿入または一括更新
//手動送信 ack .acknowledge ( ) ; log .info ( "bigData によってプッシュされたデータを受信しました。プルされたデータの量: {}、消費時間: {} ミリ秒" 、 consumerRecords .size ( ) 、 ( System .currentTimeMillis ( ) - start ) ) ; }
} このとき、消費パフォーマンスが大幅に向上し、データ処理が非常に高速になります。最大30分で500万個のデータが消費される可能性があります。 この例では、コンシューマー マイクロサービスには本番環境に 3 つのサーバーがデプロイされており、big_data_topic トピックのパーティション数は 3 であるため、同時実行数を 3 に設定する方が適切です。 プッシュされるデータ量が増え続けるにつれて、消費速度が十分に速くないと感じた場合は、毎回プルされるバッチの最大数をリセットするか、マイクロサービスのクラスターインスタンス数とトピックパーティション数を水平に拡張して、データ消費を高速化することができます。 ただし、1 台のマシンで一度にプルされるバッチの最大数が大きすぎると、大きなオブジェクトも大きくなり、GC 警告が頻繁に発生します。 したがって、実際の使用においては、1 回あたりのバッチ プルの最大数は、多ければ多いほど良いというわけではありません。現在のサーバーのハードウェア構成に応じて、適切なしきい値に調整するのが最善の選択です。 3. まとめこの記事では、主に SpringBoot 技術フレームワークを背景として、実際のビジネスニーズを組み合わせ、データ消費に kafka を使用して高いデータ スループットを実現します。次回は、消費不良の処理フローについて紹介します。 |