SpringBoot は Kafka を統合して高いデータスループットを実現します

SpringBoot は Kafka を統合して高いデータスループットを実現します

1. はじめに

前回の記事では、Kafka のアーキテクチャモデルについて詳しく紹介しました。クラスター環境では、Kafka はパーティションの数を設定することでデータの消費を高速化できます。

理論を知っているだけでは十分ではなく、実際に実践する必要があります。

次回は、本番環境の実例とSpringBoot技術フレームワークをもとに、Kafkaの使い方と高いデータスループットを実現する方法を紹介します!

2. 手順の練習

最近、同社のビッグデータチームは、顧客の注文データを毎朝計算し、パフォーマンスデータを私たちにプッシュして、営業スタッフが毎日昨日のパフォーマンスデータを確認できるようにしています。データ量は約1000万です。以下は私のドッキングプロセスです。

2.1. kafka依存パッケージを追加する

このプロジェクトの SpringBoot バージョンは 2.1.5.RELEASE で、依存する kafka バージョンは 2.2.6.RELEASE です。

 https : //back-media..com/editor ? id = 707646 / h6e90be6-7 EV6kJbV

2.2. Kafka設定変数を追加する

依存パッケージを追加した後は、application.properties に kafka 構成変数を追加するだけで、基本的に通常どおり使用できます。

 #Kafka サーバーのアドレスを指定します。クラスターには、カンマで区切られた複数のアドレスを含めることができます。
spring .kafka .bootstrap - サーバー= 197.168.25.196 : 9092
#再試行回数
spring .kafka .producer .retries = 3
#一括送信するメッセージの数
spring .kafka .producer .batch -サイズ= 1000
#32MB バッチバッファ
spring .kafka .producer .buffer -メモリ= 33554432
#デフォルトのコンシューマ グループ
spring .kafka .consumer .group - id = crm -マイクロサービス- newperformance
#最も古い未使用オフセット
spring .kafka .consumer .auto -オフセット-リセット=最も早い
#一度に取得できるデータの最大量
spring .kafka .consumer .max -ポーリング-レコード= 4000
#自動的に送信するかどうか
spring .kafka .consumer .enable - 自動コミット= true
#自動送信時間間隔、単位 ms
spring .kafka .consumer .auto -コミット-間隔= 1000

2.3.消費者を作成する

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafkaデータを聴く
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } )
パブリック void コンシューマー( ConsumerRecord < ? ? >コンシューマー レコード) {
log.info ( "bigData によってプッシュされたデータ '{}' を受信しました" consumerRecord.toString ( ) ) ;
// ...
// db .save ( consumerRecord ) ; //データを挿入または更新する
}

}

2.4.相手をシミュレートしてデータテストをプッシュする

 @RunWith ( SpringRunner.クラス)
@SpringBootテスト
パブリッククラス KafkaProducerTest {

オートワイヤード
private KafkaTemplate < String , String > kafkaTemplate ;

@テスト
パブリックボイドテスト送信 {
( int i = 0 ; i < 5000 ; i ++ ) {
Map <文字列オブジェクト> map = new LinkedHashMap <> ( ) ;
マップ.put ( "datekey" , 20210610 ) ;
map .put ( "userid" i ) ;
map .put ( "salaryAmount" i ) ;
// Kafka の big_data_topic トピックにデータをプッシュします
kafkaTemplate .send ( "big_data_topic" JSONObject .toJSONString ( map ) ) ;
}
}
}

最初は、この単一のデータ消費方法を使用してプログラムをテストしても何も問題はありませんでした。

しかし、運用開始後に大きな問題が発覚しました。1,000 万個のデータを処理するのに少なくとも 3 時間かかり、データ ダッシュボードにデータが不足する結果となりました。

翌日、私は自分の失敗から学び、バッチ消費モデルに切り替えることにしました。どうやってやるんですか?以下をご覧ください!

2.5. Kafka の消費モードをバッチ消費に変更する

まず、次の内容の KafkaConfiguration 構成クラスを作成します。

 @構成
パブリッククラス KafkaConfiguration {

@値( "${spring.kafka.bootstrap-servers}" )
プライベート文字列 bootstrapServers ;

@値( "${spring.kafka.producer.retries}" )
プライベート整数再試行;

@値( "${spring.kafka.producer.batch-size}" )
プライベート整数バッチサイズ;

@Value ( "${spring.kafka.producer.buffer-memory}" )
プライベート整数バッファメモリ;

@値( "${spring.kafka.consumer.group-id}" )
プライベート文字列グループID ;

@Value ( "${spring.kafka.consumer.auto-offset-reset}" )
プライベート文字列 autoOffsetReset ;

@Value ( "${spring.kafka.consumer.max-poll-records}" )
プライベート整数maxPollRecords ;

@値( "${spring.kafka.consumer.batch.concurrency}" )
プライベートInteger batchConcurrency ;

@値( "${spring.kafka.consumer.auto-commit}" )
プライベートブール値autoCommit ;

@値( "${spring.kafka.consumer.auto-commit-interval}" )
プライベート整数autoCommitInterval ;


/**
* プロデューサー設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト>プロデューサー構成( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ProducerConfig .ACKS_CONFIG "0" ) ;
props .put ( ProducerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ProducerConfig .RETRIES_CONFIG 再試行) ;
props .put ( ProducerConfig .BATCH_SIZE_CONFIG batchSize ) ;
props .put ( ProducerConfig .LINGER_MS_CONFIG 1 ) ;
props .put ( ProducerConfig .BUFFER_MEMORY_CONFIG bufferMemory ) ;
props .put ( ProducerConfig .KEY_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
props .put ( ProducerConfig .VALUE_SERIALIZER_CLASS_CONFIG StringSerializer .class ) ;
プロパティを返します
}

/**
* メーカー工場
*/
@ビーン
パブリックプロデューサーファクトリー<文字列,文字列>プロデューサーファクトリー( ) {
新しい DefaultKafkaProducerFactory <> ( producerConfigs ( ) )を返します
}

/**
* プロデューサーテンプレート
*/
@ビーン
パブリック KafkaTemplate <文字列,文字列> kafkaTemplate ( ) {
新しい KafkaTemplate <> ( producerFactory ( ) )を返します
}


/**
* 消費者設定情報
*/
@ビーン
パブリックマップ<文字列オブジェクト> consumerConfigs ( ) {
Map < String , Object > props = new HashMap <> ( ) ;
props .put ( ConsumerConfig .GROUP_ID_CONFIG groupId ) ;
props .put ( ConsumerConfig .AUTO_OFFSET_RESET_CONFIG autoOffsetReset ) ;
props .put ( ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG bootstrapServers ) ;
props .put ( ConsumerConfig .MAX_POLL_RECORDS_CONFIG maxPollRecords ) ;
props .put ( ConsumerConfig .ENABLE_AUTO_COMMIT_CONFIG autoCommit ) ;
props .put ( ConsumerConfig .SESSION_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .REQUEST_TIMEOUT_MS_CONFIG 30000 ) ;
props .put ( ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
props .put ( ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG StringDeserializer .class ) ;
プロパティを返します
}

/**
* 消費者向けバッチ工場
*/
@ビーン
パブリック KafkaListenerContainerFactory < ? >バッチファクトリー( ) {
ConcurrentKafkaListenerContainerFactory < Integer String >ファクトリー= new ConcurrentKafkaListenerContainerFactory <> ( ) ;
ファクトリ.setConsumerFactory (新しい DefaultKafkaConsumerFactory <> ( consumerConfigs ( ) ) ) ;
//同時実行数をトピックパーティションの数以下に設定します
ファクトリ.setConcurrency ( batchConcurrency ) ;
ファクトリ.getContainerProperties ( ) . setPollTimeout ( 1500 ) ;
ファクトリ.getContainerProperties ( ) .setAckMode ( ContainerProperties .AckMode .MANUAL_IMMEDIATE ) ;
//バッチ消費に設定され、各バッチの数は Kafka 構成パラメータConsumerConfig.MAX_POLL_RECORDS_CONFIGで設定されます
ファクトリ.setBatchListener ( true ) ;
工場を返却します
}

}

同時に、同時実行数を設定するための新しい spring.kafka.consumer.batch.concurrency 変数が追加されました。このパラメータを通じて、消費を実現するために複数のスレッドを指定できます。

application.properties 構成ファイルに、次の変数を追加します。

 #バッチ消費の同時実行数、トピックパーティションの数以下
spring .kafka .consumer .batch .concurrency = 3

#バッチプルの最大数を4000に設定する
spring .kafka .consumer .max -ポーリング-レコード= 4000

#自動送信をfalseに設定する
spring .kafka .consumer .enable - 自動コミット= false

最後に、単一消費方式をバッチ消費方式モードに変更します。

 @成分
パブリッククラス BigDataTopicListener {

プライベート静的最終 Logger ログ= LoggerFactory .getLogger ( BigDataTopicListener .class ) ;

/**
* kafka データの監視 (バッチ消費)
* @param コンシューマーレコード
* @param ack
*/
@KafkaListener (トピック= { "big_data_topic" } コンテナファクトリー= "batchFactory" )
パブリック void batchConsumer (リスト< ConsumerRecord < ? ? >> consumerRecords 確認応答 ack ) {
長い開始= System .currentTimeMillis ( ) ;

// ...
// db .batchSave ( consumerRecords ) ; //データの一括挿入または一括更新

//手動送信
ack .acknowledge ( ) ;
log .info ( "bigData によってプッシュされたデータを受信しました。プルされたデータの量: {}、消費時間: {} ミリ秒" consumerRecords .size ( ) ( System .currentTimeMillis ( ) - start ) ) ;
}

}

このとき、消費パフォーマンスが大幅に向上し、データ処理が非常に高速になります。最大30分で500万個のデータが消費される可能性があります。

この例では、コンシューマー マイクロサービスには本番環境に 3 つのサーバーがデプロイされており、big_data_topic トピックのパーティション数は 3 であるため、同時実行数を 3 に設定する方が適切です。

プッシュされるデータ量が増え続けるにつれて、消費速度が十分に速くないと感じた場合は、毎回プルされるバッチの最大数をリセットするか、マイクロサービスのクラスターインスタンス数とトピックパーティション数を水平に拡張して、データ消費を高速化することができます。

ただし、1 台のマシンで一度にプルされるバッチの最大数が大きすぎると、大きなオブジェクトも大きくなり、GC 警告が頻繁に発生します。

したがって、実際の使用においては、1 回あたりのバッチ プルの最大数は、多ければ多いほど良いというわけではありません。現在のサーバーのハードウェア構成に応じて、適切なしきい値に調整するのが最善の選択です。

3. まとめ

この記事では、主に SpringBoot 技術フレームワークを背景として、実際のビジネスニーズを組み合わせ、データ消費に kafka を使用して高いデータ スループットを実現します。次回は、消費不良の処理フローについて紹介します。

<<:  クラウド コンピューティングの進化: 「分散型クラウド」が最終形態となるか?

>>:  SaaSビジネスの成長に関する真実

推薦する

テンセントカンファレンスの拡大の裏側:100万コアのコンピューティングリソースはすべて自社開発のサーバー星星海によって支えられている

パンデミックの間、リモート会議や共同作業の需要が急増しました。 1月29日から2月6日まで、Tenc...

SEO は高級ワインのようなものです。味見して初めてその意味が理解できます。

SEO はワインのようなものです。味わい方を知っている人だけが、その深い意味を知っています。ワインの...

2020年のトレンド予測: SaaSモデルが企業のデジタル化の第一選択肢に

2019 年が終わりに近づき、2020 年は新たな旅の始まりです。振り返ってみると、エンタープライズ...

#専用サーバーの推奨# tmhhost、388元/月、30M CN2 GIAまたは50M CUII、E3-1230v5/16gメモリ/250gSSD/2IP

tmhhostは年末のクリアランスプロセスを開始しました。ロサンゼルスCN2 GIA(AS4809)...

Tuanbao.comのCEOである任春雷氏は逃亡し、北京本社は現在無人となっていると言われている。

1月24日、ネットユーザーらは、Tuanbao.comのCEOである任春雷氏が「逃亡」し、北京本社は...

ウェブサイトの統計情報を使用して、さまざまな観点からコンバージョン率を調査し、改善します。

ウェブサイトのコンバージョン率は、すべてのウェブマスターが追求しているデータです。ウェブサイトの目標...

Baidu のウェブサイトコンテンツの識別について

先月から旅行情報サイトを2つ作成しました。現在、サイトは充実しており、スナップショットもリアルタイム...

検索エンジンのランキングを改善し、収益の向上を実現します

企業が検索エンジンのランキングを向上させようとすればするほど、ビジネスにとって価値あるものがさらに増...

タオバオSEOの特徴2

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますキーワード...

クラウド移行を成功させるためのロードマップを構築する方法

今年の新型コロナウイルス感染症のパンデミックは多くの企業にとって混乱を招き、ビジネスリーダーは大規模...

ウェブマスターはブラックリストに載らないようにするにはどうすればよいでしょうか?

ウェブマスターにとって、自分のウェブサイトがブラックリンクにリンクされるのは、非常に憂鬱なことだと思...

記事を検索エンジンに素早くインデックスさせる方法

多くの初心者ウェブマスターの目には、ウェブサイトをインデックスに登録するのは難しいように見えます。特...

SUSE、ローカリゼーション向けRancher Enterprise Edition 2.6をリリース

Rancher Enterprise Edition は、「Rancher China」の時代に初め...

専門家オンライン: 未来の工場の 9 つの機能

[51CTO.comからのオリジナル記事] クラウドコンピューティング、ビッグデータ、モノのインター...

IDC:中国のパブリッククラウドマネージドセキュリティサービス市場規模は2021年に6,420万米ドルに達する見込み

IDCが発表した調査レポート「中国のパブリッククラウドマネージドセキュリティサービス市場シェア、20...