SpringbootはKafka Streamのリアルタイム統計を統合します

SpringbootはKafka Streamのリアルタイム統計を統合します

[[417927]]

環境: springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka ストリームの紹介

Kafka はバージョン 0.10 で Stream API をリリースしました。これにより、Kafka に保存されているデータに対してストリーム処理と分析を実行する機能が提供されます。

ストリーミング コンピューティングは一般にバッチ コンピューティングと比較されます。バッチ コンピューティングでは、多くの場合、固定データ セットを入力として受け取り、結果を計算します。ストリーミング コンピューティングの入力は多くの場合「無制限」(Unbounded Data) であり、これは連続的な入力です。つまり、計算のためにデータの全量を取得することはできません。同時に計算結果も連続的に出力され、最終結果ではなく、ある瞬間の結果のみが得られます。

Kafka Streams は、Kafka に保存されているデータを処理および分析するためのクライアント ライブラリです。これは、イベント時間と処理時間を区別する方法、ウィンドウ処理のサポート、シンプルで効率的な管理、アプリケーション ステータスのリアルタイム クエリなど、ストリーム処理のいくつかの重要な概念に基づいて構築されています。

Kafka Streams の敷居は非常に低く、通常の Kafka メッセージ処理プログラムの作成とほとんど変わらず、マルチプロセス展開による容量拡張、負荷分散、高可用性 (Kafka Consumer の並列モデル) を実現できます。

Kafka Streams の機能の一部:

  • あらゆるJavaアプリケーションに統合できるシンプルで軽量なクライアントライブラリとして設計されています
  • Kafka 以外の追加の依存関係はなく、水平拡張をサポートし、連続性を保証するために Kafka のパーティショニング モデルが使用されます。
  • フォールトトレラントな状態ストレージによる効率的なステートフル操作(ウィンドウ結合と集計)
  • 正確に1回だけのセマンティクスをサポート
  • レコードレベルの処理をサポートし、ミリ秒レベルのレイテンシを実現
  • 高レベルストリームDSLと低レベルプロセッサAPIを提供

ストリーム処理トポロジ

  • ストリームは、Kafka Streams によって提供される最も重要な抽象化であり、無制限で継続的に更新されるデータセットを表します。ストリームは、不変のデータ レコードの順序付けされた、再生可能でフォールト トレラントなシーケンスであり、データ レコードはキーと値のペアとして定義されます。
  • ストリーム処理アプリケーションは、Kafka Streams ライブラリを使用するアプリケーションです。これは、プロセッサ トポロジを通じて計算ロジックを定義します。各プロセッサ トポロジは、ストリームを介した複数のストリーム プロセッサ (ノード) のグラフです。
  • ストリーム プロセッサは、プロセッサ トポロジ内のノードです。これは、トポロジ内の上流プロセッサから一度に 1 つの入力レコードを受信し、そのレコードに操作を適用してストリーム内のデータを変換し、その後、下流プロセッサに 1 つ以上の出力レコードを生成する処理ステップを表します。

特別なプロセッサが 2 つあります。

ソース プロセッサは、アップストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。 1 つ以上の Kafka トピックからレコードを消費し、下流のプロセッサに転送することで、トポロジの入力ストリームを生成します。

シンク プロセッサ シンク プロセッサは、ダウンストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。上流プロセッサから受信したレコードを指定された kafka トピックに送信します。


関連するコアコンセプトについては、以下のリンクを参照してください。


以下はSpringbootでのKafka Streamの適用例です。

頼る

  1. <依存関係>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </依存関係>
  5. <依存関係>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>スプリングカフカ</artifactId>
  8. </依存関係>
  9. <依存関係>
  10. <グループ ID>org.apache.kafka</グループ ID>
  11. <artifactId>kafka-streams</artifactId>
  12. </依存関係>

構成

  1. サーバ:
  2. ポート: 9090
  3. 春:
  4. 応用:
  5. 名前: kafka-demo
  6. カフカ:
  7. ストリーム:
  8. アプリケーションID : ${spring.application.name }
  9. プロパティ:
  10. spring.json.trusted.packages: '*'  
  11. ブートストラップサーバー:
  12. - ローカルホスト:9092
  13. - ローカルホスト:9093
  14. - ローカルホスト:9094
  15. プロデューサー:
  16. 回答: 1
  17. 再試行: 10
  18. キー-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. 値シリアライザー: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
  20. プロパティ:
  21. spring.json.trusted.packages: '*'  
  22. 消費者:
  23. キー-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. 値デシリアライザー: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
  25. 自動コミットを有効にする: false  
  26. グループID: ConsumerTest
  27. 自動オフセットリセット: 最新
  28. プロパティ:
  29. セッションタイムアウト: 12000
  30. ハートビート間隔: 3000
  31. 最大投票レコード数: 100
  32. spring.json.trusted.packages: '*'  
  33. リスナー:
  34. ack-mode: 手動即時
  35. タイプ: バッチ
  36. 同時実行数: 8
  37. プロパティ:
  38. 最大ポーリング間隔(ミリ秒) : 300000

メッセージ送信

  1. @サービス
  2. パブリッククラスMessageSend {
  3. @リソース
  4. private KafkaTemplate<String, Message> kafkaTemplate;
  5. パブリックvoid sendMessage2(メッセージ メッセージ) {
  6. kafkaTemplate.send(新しい ProducerRecord<String, Message>( "test" 、 message)).addCallback(結果 -> {
  7. システム。 out .println( "実行が成功しました..." + Thread.currentThread().getName()) ;
  8. }, 例 -> {
  9. システム。 out .println( "実行に失敗しました" );
  10. 例:printStackTrace();
  11. });
  12. }
  13. }

メッセージ監視

  1. @KafkaListener(トピック = { "テスト" })
  2. パブリックvoid listener2(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  3. ( ConsumerRecord<String, Message> レコード: レコード) {
  4. システム。 out .println(this.getClass().hashCode() + ", スレッド" + Thread.currentThread().getName() + ", キー: " + record.key ( ) + ", 受信メッセージ: " + record.value() + ", パーティション: " + record.partition() + ", オフセット: " + record.offset());
  5. }
  6. 試す {
  7. TimeUnit.SECONDS.sleep(0);
  8. } キャッチ (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. ack.acknowledge() ;
  12. }
  13.      
  14. @KafkaListener(トピック = { "デモ" })
  15. パブリックvoid listenerDemo(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  16. ( ConsumerRecord<String, Message> レコード: レコード) {
  17. システム。 out .println( "デモトピック: " + this.getClass().hashCode() + "、スレッド" + Thread.currentThread().getName() + "、キー: " + record.key ( ) + "、受信メッセージ: " + record.value() + "、パーティション: " + record.partition() + "、オフセット: " + record.offset());
  18. }
  19. ack.acknowledge() ;
  20. }

Kafka ストリーム処理

メッセージの変換と他のトピックへの転送

  1. @ビーン
  2. パブリックKStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
  3. KStream<Object, Object> ストリーム = streamsBuilder.stream( "テスト" );
  4. stream.map((キー、値) -> {
  5. システム。 out .println( "元のメッセージの内容: " + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;
  6. return new KeyValue<>( key , "{\"title\": \"123123\", \"message\": \"コンテンツを再定義\"}" .getBytes(Charset.forName( "UTF-8" ))) ;
  7. })。 「デモ」
  8. ストリームを返します
  9. }

実行結果:


ストリームオブジェクトの処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream4(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.map((キー、値) -> {
  8. value.setTitle( "XXXXXXX" );
  9. 新しいKeyValue<>(キー、値)を返します
  10. })。 to ( "demo" (Serdes.String()、jsonSerde)を使用して生成されます) ;
  11. ストリームを返します
  12. }

実行結果:


グループ処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream5(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .count () 関数
  15. .toStream().print(Printed.toSysOut());
  16. ストリームを返します
  17. }

実行結果:


重合

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream6(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream().print(Printed.toSysOut());
  19. ストリームを返します
  20. }

実行結果:


フィルター データをフィルター

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream7(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream()
  19. .filter((キー, 値) -> ! "2" .equals(キー))
  20. .print(Printed.toSysOut());
  21. ストリームを返します
  22. }

実行結果:


フィルターキーが「2」と等しくありません

ブランチマルチストリーム処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream8(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. // ブランチ、マルチストリーム処理
  8. KStream<文字列、メッセージ>[] arrStream = stream.branch(
  9. (キー, 値 ) -> "男性" .equals(value.getSex()),
  10. (キー、値) -> "女性" .equals(value.getSex()));
  11. ストリーム。 ( arrStream ).forEach(-> {
  12. .foreach ((キー、メッセージ) -> {
  13. システム。出力.println(Thread.currentThread().getName() + ", キー = " +キー+ ", メッセージ = " + メッセージ);
  14. });
  15. });
  16. ストリームを返します
  17. }

実行結果:


複数フィールドのグループ化

複数のselectKeyを使用することはできません。後者は前者を上書きします。

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStreamM2(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. ストリーム
  8. .selectKey(新しいKeyValueMapper<文字列、メッセージ、文字列>() {
  9. @オーバーライド
  10. パブリック文字列適用(文字列キー、メッセージ値) {
  11. System.out .println (Thread.currentThread().getName());
  12. 戻り値.getTime() + " | " + value.getOrgCode() ;
  13. }
  14. })
  15. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  16. .count () 関数
  17. .toStream().print(Printed.toSysOut());
  18. ストリームを返します
  19. }

実行結果:


<<:  我が国の政府クラウド開発の現状と動向の分析

>>:  クラウドネイティブの進化のトレンドにおける従来のデータベースアップグレードの実践

推薦する

ウェブサイトの統計情報を使用して、さまざまな観点からコンバージョン率を調査し、改善します。

ウェブサイトのコンバージョン率は、すべてのウェブマスターが追求しているデータです。ウェブサイトの目標...

ウェブサイトタイトルの変更が検索エンジンに与える影響の分析例

今年、百度の「6.28事件」が突如発生し、検索業界の長らく沈黙していた戦場が破られた。「8.22事件...

SaaS をサービスの観点から見ると何がわかるでしょうか?

実際、サービス経済の到来により、労働力の大部分が農業や製造業からサービス産業に大規模に移行するのと同...

BaiduとGoogleの違い

Baidu と Google の違いについて議論し、Baidu と Google の違いについて良い...

医療業界向けのクラウド移行とオンプレミスソリューション

急速に進化する今日の医療環境において、テクノロジーは患者のケアの確保、データの管理、運用効率の維持に...

ウェブサイトを最適化する前に考慮すべき質問

検索エンジン最適化(SEO)は10年前から中国にひっそりと導入されてきました。検索エンジンは小規模か...

量子コンピューティングの可能性を解き放つ:ゲームを変える技術

量子コンピューティングは、技術進歩における次のフロンティアとして長い間歓迎されてきました。比類のない...

B2Bプラットフォームで転送率の高い情報を発信する方法

当社が情報公開する最終的な目的は取引量です。 B2B プラットフォームは数千万あり、多くの営業マンは...

SAP Business Oneが新しい小売業を支援、Tmall Double 11がテストの場に

2017年11月11日まで残り12時間を切った現在、SKII、Olay、Pampers、Vidal ...

モバイル電子商取引会社Yunqi Acceleratorは、ユーザーに情報セキュリティを最優先するよう呼びかけている。

近年、電子商取引は非常に人気が高まっており、特にモバイルインターネットの普及以降、多くの電子商取引ブ...

信頼できるアメリカのcn2gia VPS業者数社から推奨されており、帯域幅が大きく、評判が良く、コストパフォーマンスが高い

米国のCN2 GIA(AS4809)は、中国と米国が民間レベルで提供した最も初期かつ最速のネットワー...

SAP: さらなるイノベーションでインテリジェント時代を切り拓く

[51CTO.comより引用] 2020年のキーワードといえば、「疫病」が間違いなく第1位にランクさ...

新しいウェブサイトを公開した後の最適化の 4 つの成長段階

フェーズ1: 新しいウェブサイトの立ち上げ「権威の高いウェブサイトに外部リンクを残す」か「Baidu...