SpringBoot は Kafka を統合して高いデータスループットを実現します

SpringBoot は Kafka を統合して高いデータスループットを実現します

1. はじめに

前回の記事では、Kafka のアーキテクチャモデルについて詳しく紹介しました。クラスター環境では、Kafka はパーティションの数を設定することでデータの消費を高速化できます。

理論を知っているだけでは十分ではなく、実際に実践する必要があります。

次回は、本番環境の実例とSpringBoot技術フレームワークをもとに、Kafkaの使い方と高いデータスループットを実現する方法を紹介します!

2. 手順の練習

最近、同社のビッグデータチームは、顧客の注文データを毎朝計算し、パフォーマンスデータを私たちにプッシュして、営業スタッフが毎日昨日のパフォーマンスデータを確認できるようにしています。データ量は約1000万です。以下は私のドッキングプロセスです。

2.1. kafka依存パッケージを追加する

このプロジェクトの SpringBoot バージョンは 2.1.5.RELEASE で、依存する kafka バージョンは 2.2.6.RELEASE です。

 https : //back-media..com/editor ? id = 707646 / h6e90be6-7 EV6kJbV

2.2. Kafka設定変数を追加する

依存パッケージを追加した後は、application.properties に kafka 構成変数を追加するだけで、基本的に通常どおり使用できます。

 #Kafka サーバーのアドレスを指定します。クラスターには、カンマで区切られた複数のアドレスを含めることができます。
spring .kafka .bootstrap - サーバー= 197.168.25.196 : 9092
#再試行回数
spring .kafka .producer .retries = 3
#一括送信するメッセージの数
spring .kafka .producer .batch -サイズ= 1000
#32MB バッチバッファ
spring .kafka .producer .buffer -メモリ= 33554432
#デフォルトのコンシューマ グループ
spring .kafka .consumer .group - id = crm -マイクロサービス- newperformance
#最も古い未使用オフセット
spring .kafka .consumer .auto -オフセット-リセット=最も早い
#一度に取得できるデータの最大量
spring .kafka .consumer .max -ポーリング-レコード= 4000
#自動的に送信するかどうか
spring .kafka .consumer .enable - 自動コミット= true
#自動送信時間間隔、単位 ms
spring .kafka .consumer .auto -コミット-間隔= 1000

2.3.消費者を作成する

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafkaデータを聴く
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } )
パブリック void コンシューマー( ConsumerRecord < ? ? >コンシューマー レコード) {
log.info ( "bigData によってプッシュされたデータ '{}' を受信しました" consumerRecord.toString ( ) ) ;
// ...
// db .save ( consumerRecord ) ; //データを挿入または更新する
}

}

2.4.相手をシミュレートしてデータテストをプッシュする

 @RunWith ( SpringRunner.クラス)
@SpringBootテスト
パブリッククラス KafkaProducerTest {

オートワイヤード
private KafkaTemplate < String , String > kafkaTemplate ;

@テスト
パブリックボイドテスト送信 {
( int i = 0 ; i < 5000 ; i ++ ) {
Map <文字列オブジェクト> map = new LinkedHashMap <> ( ) ;
マップ.put ( "datekey" , 20210610 ) ;
map .put ( "userid" i ) ;
map .put ( "salaryAmount" i ) ;
// Kafka の big_data_topic トピックにデータをプッシュします
kafkaTemplate .send ( "big_data_topic" JSONObject .toJSONString ( map ) ) ;
}
}
}

最初は、この単一のデータ消費方法を使用してプログラムをテストしても何も問題はありませんでした。

しかし、運用開始後に大きな問題が発覚しました。1,000 万個のデータを処理するのに少なくとも 3 時間かかり、データ ダッシュボードにデータが不足する結果となりました。

翌日、私は自分の失敗から学び、バッチ消費モデルに切り替えることにしました。どうやってやるんですか?以下をご覧ください!

2.5. Kafka の消費モードをバッチ消費に変更する

まず、次の内容の KafkaConfiguration 構成クラスを作成します。

 @構成
パブリッククラス KafkaConfiguration {

@値( "${spring.kafka.bootstrap-servers}" )
プライベート文字列 bootstrapServers ;

@値( "${spring.kafka.producer.retries}" )
プライベート整数再試行;

@値( "${spring.kafka.producer.batch-size}" )
プライベート整数バッチサイズ;

@Value ( "${spring.kafka.producer.buffer-memory}" )
プライベート整数バッファメモリ;

@値( "${spring.kafka.consumer.group-id}" )
プライベート文字列グループID ;

@Value ( "${spring.kafka.consumer.auto-offset-reset}" )
プライベート文字列 autoOffsetReset ;

@Value ( "${spring.kafka.consumer.max-poll-records}" )
プライベート整数maxPollRecords ;

@値( "${spring.kafka.consumer.batch.concurrency}" )
プライベートInteger batchConcurrency ;

@値( "${spring.kafka.consumer.auto-commit}" )
プライベートブール値autoCommit ;

@値( "${spring.kafka.consumer.auto-commit-interval}" )
プライベート整数autoCommitInterval ;


/**
* プロデューサー設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト>プロデューサー構成( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ProducerConfig .ACKS_CONFIG "0" ) ;
props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ProducerConfig .RETRIES_CONFIG 再試行) ;
props .put ( ProducerConfig .BATCH_SIZE_CONFIG batchSize ) ;
props .put ( ProducerConfig .LINGER_MS_CONFIG 1 ) ;
props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG bufferMemory ) ;
props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
プロパティを返します
}

/**
* メーカー工場
*/
@ビーン
パブリックプロデューサーファクトリー<文字列,文字列>プロデューサーファクトリー( ) {
新しい DefaultKafkaProducerFactory <> ( producerConfigs ( ) )を返します
}

/**
* プロデューサーテンプレート
*/
@ビーン
パブリック KafkaTemplate <文字列,文字列> kafkaTemplate ( ) {
新しい KafkaTemplate <> ( producerFactory ( ) )を返します
}


/**
* 消費者設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト> consumerConfigs ( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ConsumerConfig .GROUP_ID_CONFIG groupId ) ;
props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG autoOffsetReset ) ;
props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG maxPollRecords ) ;
props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG autoCommit ) ;
props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
プロパティを返します
}

/**
* 消費者向けバッチ工場
*/
@ビーン
パブリック KafkaListenerContainerFactory < ? >バッチファクトリー( ) {
ConcurrentKafkaListenerContainerFactory < Integer String >ファクトリー= new ConcurrentKafkaListenerContainerFactory <> ( ) ;
ファクトリ.setConsumerFactory (新しい DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ;
//同時実行数をトピックパーティションの数以下に設定します
ファクトリ.setConcurrency ( batchConcurrency ) ;
ファクトリ.getContainerProperties ( ) . setPollTimeout ( 1500 ) ;
ファクトリ.getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ;
//バッチ消費に設定され、各バッチの数は Kafka 構成パラメータConsumerConfig.MAX_POLL_RECORDS_CONFIGで設定されます
ファクトリ.setBatchListener ( true ) ;
工場を返却します
}

}

同時に、同時実行数を設定するための新しい spring.kafka.consumer.batch.concurrency 変数が追加されました。このパラメータを通じて、消費を実現するために複数のスレッドを指定できます。

application.properties 構成ファイルに、次の変数を追加します。

 #バッチ消費の同時実行数、トピックパーティションの数以下
spring .kafka .consumer .batch .concurrency = 3

#バッチプルの最大数を4000に設定する
spring .kafka .consumer .max -ポーリング-レコード= 4000

#自動送信をfalseに設定する
spring .kafka .consumer .enable - 自動コミット= false

最後に、単一消費方式をバッチ消費方式モードに変更します。

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafka データの監視 (バッチ消費)
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } コンテナファクトリー= "batchFactory" )
パブリック void batchConsumer (リスト< ConsumerRecord < ? ? >> consumerRecords 確認応答 ack ) {
長い開始= System .currentTimeMillis ( ) ;

// ...
// db .batchSave ( consumerRecords ) ; //データの一括挿入または一括更新

//手動送信
ack .acknowledge ( ) ;
log .info ( "bigData によってプッシュされたデータを受信しました。プルされたデータの量: {}、消費時間: {} ミリ秒" consumerRecords .size ( ) ( System .currentTimeMillis ( ) - start ) ) ;
}

}

このとき、消費パフォーマンスが大幅に向上し、データ処理が非常に高速になります。最大30分で500万個のデータが消費される可能性があります。

この例では、コンシューマー マイクロサービスには本番環境に 3 つのサーバーがデプロイされており、big_data_topic トピックのパーティション数は 3 であるため、同時実行数を 3 に設定する方が適切です。

プッシュされるデータ量が増え続けるにつれて、消費速度が十分に速くないと感じた場合は、毎回プルされるバッチの最大数をリセットするか、マイクロサービスのクラスターインスタンス数とトピックパーティション数を水平に拡張して、データ消費を高速化することができます。

ただし、1 台のマシンで一度にプルされるバッチの最大数が大きすぎると、大きなオブジェクトも大きくなり、GC 警告が頻繁に発生します。

したがって、実際の使用においては、1 回あたりのバッチ プルの最大数は、多ければ多いほど良いというわけではありません。現在のサーバーのハードウェア構成に応じて、適切なしきい値に調整するのが最善の選択です。

3. まとめ

この記事では、主に SpringBoot 技術フレームワークを背景として、実際のビジネスニーズを組み合わせ、データ消費に kafka を使用して高いデータ スループットを実現します。次回は、消費不良の処理フローについて紹介します。

<<:  クラウド コンピューティングの進化: 「分散型クラウド」が最終形態となるか?

>>:  SaaSビジネスの成長に関する真実

推薦する

ユーザーと検索エンジンの究極の競争

あなたはこの記事に同意しないかもしれません。私はただこれについてあなたと議論しているだけです。あなた...

百度のザクロアルゴリズムに直面したとき、私たちは何に注意すべきか

2013年5月17日午後、百度のウェブ検索不正対策チームは百度ウェブマスタープラットフォームで、1週...

降格したウェブサイトを素早く復元する方法の例の共有

実際、このような記事を書く主な目的は、過去 1 か月間の Web サイト構築の経験をまとめることです...

budgetvmはどうですか? Budgetvm ロサンゼルス データセンター 独立サーバー 簡単な評価

budgetvmはどうですか? budgetvm の速度はどのくらいですか? budgetvm は良...

第1四半期から第3四半期までの衣料品ブランドのソーシャルメディアマーケティングに関する分析レポート

消費のグレードアップ、国民的トレンドの台頭、電子商取引ライブストリーミングの人気などの影響を受け、ア...

企業ブランドと上司の個人ブランド、どちらが重要ですか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています企業として...

売れ筋の ASUS P5QL/EPU が第一候補にならないわけがありません。

株式市場が回復し始めると、経済危機によって引き起こされた消費者のパニックは消えつつあり、IT市場も回...

実践スキル: 分散システムを体系的に学ぶにはどうすればよいでしょうか?

分散システムについて学ぶ前に、最初に解決する必要がある質問は、「分散システムはどのような問題を解決す...

StreamNativeは、クラウドネイティブ業界の高速な前進を支援するために、Tencent Cloud Native Acceleratorに正式に参加しました。

近年、インターネットのイノベーションは、消費者向けインターネットから産業向けインターネットへと変化し...

ニュース: DigitalOcean、インドのデータセンター設立を発表

DigitalOceanは本日、インド南部の都市バンガロールに新しいデータセンターを建設する計画を発...

necs 768M KVM 4.3 ポンド/月

necs.co.uk (英国登録会社 VAT 番号 927207819、逃げる心配はありません) で...

asmallorange: すべての仮想ホストが 40% オフ/年間支払いは最低 $14/cpanel パネル

asmallorangeでは、本日から10月4日まで、すべての仮想ホストを40%割引でご提供していま...

新しい企業ウェブサイトの外部リンクのレイアウト方法の分析例

外部リンク構築は、ウェブサイトの最適化にとって非常に重要です。新しいウェブサイトの場合、外部リンク構...

フレンドリーリンクを交換する際にウェブマスターが見落としがちな問題は何ですか?

フレンドリー リンクの交換は、ほとんどの Web サイト最適化担当者が行う必要がある作業です。Web...

オンラインでお金を稼ぐには、Taobaoを選択してください

インターネット時代の到来とともに、ますます多くのネットユーザーがこのグループに加わり、数え切れないほ...