SpringBoot は Kafka を統合して高いデータスループットを実現します

SpringBoot は Kafka を統合して高いデータスループットを実現します

1. はじめに

前回の記事では、Kafka のアーキテクチャモデルについて詳しく紹介しました。クラスター環境では、Kafka はパーティションの数を設定することでデータの消費を高速化できます。

理論を知っているだけでは十分ではなく、実際に実践する必要があります。

次回は、本番環境の実例とSpringBoot技術フレームワークをもとに、Kafkaの使い方と高いデータスループットを実現する方法を紹介します!

2. 手順の練習

最近、同社のビッグデータチームは、顧客の注文データを毎朝計算し、パフォーマンスデータを私たちにプッシュして、営業スタッフが毎日昨日のパフォーマンスデータを確認できるようにしています。データ量は約1000万です。以下は私のドッキングプロセスです。

2.1. kafka依存パッケージを追加する

このプロジェクトの SpringBoot バージョンは 2.1.5.RELEASE で、依存する kafka バージョンは 2.2.6.RELEASE です。

 https : //back-media..com/editor ? id = 707646 / h6e90be6-7 EV6kJbV

2.2. Kafka設定変数を追加する

依存パッケージを追加した後は、application.properties に kafka 構成変数を追加するだけで、基本的に通常どおり使用できます。

 #Kafka サーバーのアドレスを指定します。クラスターには、カンマで区切られた複数のアドレスを含めることができます。
spring .kafka .bootstrap - サーバー= 197.168.25.196 : 9092
#再試行回数
spring .kafka .producer .retries = 3
#一括送信するメッセージの数
spring .kafka .producer .batch -サイズ= 1000
#32MB バッチバッファ
spring .kafka .producer .buffer -メモリ= 33554432
#デフォルトのコンシューマ グループ
spring .kafka .consumer .group - id = crm -マイクロサービス- newperformance
#最も古い未使用オフセット
spring .kafka .consumer .auto -オフセット-リセット=最も早い
#一度に取得できるデータの最大量
spring .kafka .consumer .max -ポーリング-レコード= 4000
#自動的に送信するかどうか
spring .kafka .consumer .enable - 自動コミット= true
#自動送信時間間隔、単位 ms
spring .kafka .consumer .auto -コミット-間隔= 1000

2.3.消費者を作成する

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafkaデータを聴く
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } )
パブリック void コンシューマー( ConsumerRecord < ? ? >コンシューマー レコード) {
log.info ( "bigData によってプッシュされたデータ '{}' を受信しました" consumerRecord.toString ( ) ) ;
// ...
// db .save ( consumerRecord ) ; //データを挿入または更新する
}

}

2.4.相手をシミュレートしてデータテストをプッシュする

 @RunWith ( SpringRunner.クラス)
@SpringBootテスト
パブリッククラス KafkaProducerTest {

オートワイヤード
private KafkaTemplate < String , String > kafkaTemplate ;

@テスト
パブリックボイドテスト送信 {
( int i = 0 ; i < 5000 ; i ++ ) {
Map <文字列オブジェクト> map = new LinkedHashMap <> ( ) ;
マップ.put ( "datekey" , 20210610 ) ;
map .put ( "userid" i ) ;
map .put ( "salaryAmount" i ) ;
// Kafka の big_data_topic トピックにデータをプッシュします
kafkaTemplate .send ( "big_data_topic" JSONObject .toJSONString ( map ) ) ;
}
}
}

最初は、この単一のデータ消費方法を使用してプログラムをテストしても何も問題はありませんでした。

しかし、運用開始後に大きな問題が発覚しました。1,000 万個のデータを処理するのに少なくとも 3 時間かかり、データ ダッシュボードにデータが不足する結果となりました。

翌日、私は自分の失敗から学び、バッチ消費モデルに切り替えることにしました。どうやってやるんですか?以下をご覧ください!

2.5. Kafka の消費モードをバッチ消費に変更する

まず、次の内容の KafkaConfiguration 構成クラスを作成します。

 @構成
パブリッククラス KafkaConfiguration {

@値( "${spring.kafka.bootstrap-servers}" )
プライベート文字列 bootstrapServers ;

@値( "${spring.kafka.producer.retries}" )
プライベート整数再試行;

@値( "${spring.kafka.producer.batch-size}" )
プライベート整数バッチサイズ;

@Value ( "${spring.kafka.producer.buffer-memory}" )
プライベート整数バッファメモリ;

@値( "${spring.kafka.consumer.group-id}" )
プライベート文字列グループID ;

@Value ( "${spring.kafka.consumer.auto-offset-reset}" )
プライベート文字列 autoOffsetReset ;

@Value ( "${spring.kafka.consumer.max-poll-records}" )
プライベート整数maxPollRecords ;

@値( "${spring.kafka.consumer.batch.concurrency}" )
プライベートInteger batchConcurrency ;

@値( "${spring.kafka.consumer.auto-commit}" )
プライベートブール値autoCommit ;

@値( "${spring.kafka.consumer.auto-commit-interval}" )
プライベート整数autoCommitInterval ;


/**
* プロデューサー設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト>プロデューサー構成( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ProducerConfig .ACKS_CONFIG "0" ) ;
props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ProducerConfig .RETRIES_CONFIG 再試行) ;
props .put ( ProducerConfig .BATCH_SIZE_CONFIG batchSize ) ;
props .put ( ProducerConfig .LINGER_MS_CONFIG 1 ) ;
props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG bufferMemory ) ;
props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
プロパティを返します
}

/**
* メーカー工場
*/
@ビーン
パブリックプロデューサーファクトリー<文字列,文字列>プロデューサーファクトリー( ) {
新しい DefaultKafkaProducerFactory <> ( producerConfigs ( ) )を返します
}

/**
* プロデューサーテンプレート
*/
@ビーン
パブリック KafkaTemplate <文字列,文字列> kafkaTemplate ( ) {
新しい KafkaTemplate <> ( producerFactory ( ) )を返します
}


/**
* 消費者設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト> consumerConfigs ( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ConsumerConfig .GROUP_ID_CONFIG groupId ) ;
props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG autoOffsetReset ) ;
props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG maxPollRecords ) ;
props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG autoCommit ) ;
props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
プロパティを返します
}

/**
* 消費者向けバッチ工場
*/
@ビーン
パブリック KafkaListenerContainerFactory < ? >バッチファクトリー( ) {
ConcurrentKafkaListenerContainerFactory < Integer String >ファクトリー= new ConcurrentKafkaListenerContainerFactory <> ( ) ;
ファクトリ.setConsumerFactory (新しい DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ;
//同時実行数をトピックパーティションの数以下に設定します
ファクトリ.setConcurrency ( batchConcurrency ) ;
ファクトリ.getContainerProperties ( ) . setPollTimeout ( 1500 ) ;
ファクトリ.getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ;
//バッチ消費に設定され、各バッチの数は Kafka 構成パラメータConsumerConfig.MAX_POLL_RECORDS_CONFIGで設定されます
ファクトリ.setBatchListener ( true ) ;
工場を返却します
}

}

同時に、同時実行数を設定するための新しい spring.kafka.consumer.batch.concurrency 変数が追加されました。このパラメータを通じて、消費を実現するために複数のスレッドを指定できます。

application.properties 構成ファイルに、次の変数を追加します。

 #バッチ消費の同時実行数、トピックパーティションの数以下
spring .kafka .consumer .batch .concurrency = 3

#バッチプルの最大数を4000に設定する
spring .kafka .consumer .max -ポーリング-レコード= 4000

#自動送信をfalseに設定する
spring .kafka .consumer .enable - 自動コミット= false

最後に、単一消費方式をバッチ消費方式モードに変更します。

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafka データの監視 (バッチ消費)
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } コンテナファクトリー= "batchFactory" )
パブリック void batchConsumer (リスト< ConsumerRecord < ? ? >> consumerRecords 確認応答 ack ) {
長い開始= System .currentTimeMillis ( ) ;

// ...
// db .batchSave ( consumerRecords ) ; //データの一括挿入または一括更新

//手動送信
ack .acknowledge ( ) ;
log .info ( "bigData によってプッシュされたデータを受信しました。プルされたデータの量: {}、消費時間: {} ミリ秒" consumerRecords .size ( ) ( System .currentTimeMillis ( ) - start ) ) ;
}

}

このとき、消費パフォーマンスが大幅に向上し、データ処理が非常に高速になります。最大30分で500万個のデータが消費される可能性があります。

この例では、コンシューマー マイクロサービスには本番環境に 3 つのサーバーがデプロイされており、big_data_topic トピックのパーティション数は 3 であるため、同時実行数を 3 に設定する方が適切です。

プッシュされるデータ量が増え続けるにつれて、消費速度が十分に速くないと感じた場合は、毎回プルされるバッチの最大数をリセットするか、マイクロサービスのクラスターインスタンス数とトピックパーティション数を水平に拡張して、データ消費を高速化することができます。

ただし、1 台のマシンで一度にプルされるバッチの最大数が大きすぎると、大きなオブジェクトも大きくなり、GC 警告が頻繁に発生します。

したがって、実際の使用においては、1 回あたりのバッチ プルの最大数は、多ければ多いほど良いというわけではありません。現在のサーバーのハードウェア構成に応じて、適切なしきい値に調整するのが最善の選択です。

3. まとめ

この記事では、主に SpringBoot 技術フレームワークを背景として、実際のビジネスニーズを組み合わせ、データ消費に kafka を使用して高いデータ スループットを実現します。次回は、消費不良の処理フローについて紹介します。

<<:  クラウド コンピューティングの進化: 「分散型クラウド」が最終形態となるか?

>>:  SaaSビジネスの成長に関する真実

推薦する

nodepop-512M メモリ/10G ハードディスク/月額 6 ドル/HE ホーム フリーモント データ センター

nodepop.com は新しく設立された小規模なホスティングサービス会社です。現在は規模が小さく、...

FluxCD を使用した Kubernetes GitOps の実装

Flux は、Kubernetes 向けの継続的デリバリーおよびプログレッシブデリバリーのソリューシ...

テンセントクラウドは平頂山銀行と提携し、インターネット金融エコシステムを共同で構築

6月5日、テンセントクラウドと平頂山銀行は戦略的協力協定を締結した。両者は金融テクノロジーなど複数の...

クラウド ストレージのセキュリティ: データ暗号化メカニズムとセキュリティ レベルの簡単な分析

「 2022年企業のマルチクラウド環境の保護」レポートの調査では、クラウド環境でデータにアクセス/維...

ライブストリーミングルームに「閉じ込められた」電子商取引企業

ライブストリーミング電子商取引は近年急速に発展しています。一方では、電子商取引ライブストリーミングの...

PythonバッチクエリBaiduが含まれています

PythonはSEO学習に非常に適した言語です。構文が簡単なだけでなく、さまざまなライブラリを通じて...

百度の検索結果2ページ目上部に表示される関連検索についての考察

ウェブマスターとして、百度の変化に注目することは、すべてのウェブマスターが毎日行うべきことです。今夜...

新しい状況下で、ウェブマスターはどのようにして現場構築をマスターするのでしょうか?

「外部リンクが王様、コンテンツが王様」という最適化の時代に別れを告げ、多くのウェブマスターは百度の相...

中国のゲームで外国産ネギを収穫する正しい方法

2019年8月1日、世界的なデジタルエンターテインメントイベントChinaJoyの前日。テンセントア...

ITシステムアプリケーション開発の開発動向を分析してみよう

実際、クラウド ネイティブ テクノロジーがほとんどの IT システムが目指すべき目標となっていること...

おすすめ: photonvps-エンタープライズ VPS 50% オフ プロモーション (中国語 + Alipay をサポート)

何ということでしょう?チャンスは一瞬です、ハハハ! photonvps(通称ライスバケット)では、現...

IBM ハイブリッド クラウド業界エコシステム: 規制の厳しい業界に「ユートピア的な未来」を構築

【ユートピア】オンライン百科事典の定義によれば、「ユートピア」は「理想郷(理想社会)」とも呼ばれ、理...

SEO 欲張りすぎると行き詰まってしまいます

深センの天気は相変わらず晴れ。珍しい週末なのでゆっくり休めると思っていたのですが、電話がかかってきて...

おすすめ: ultravps.eu - 1.6 ユーロの VPS の簡単な紹介とレビュー

私は ultravps.eu から VPS を入手し、しばらく使ってみました。それについての私の一般...

「仮想IP」に関する漫画

[[396894]] コンピュータの IP アドレスと MAC アドレスの対応を取得するには、次の ...