SpringbootはKafka Streamのリアルタイム統計を統合します

SpringbootはKafka Streamのリアルタイム統計を統合します

[[417927]]

環境: springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka ストリームの紹介

Kafka はバージョン 0.10 で Stream API をリリースしました。これにより、Kafka に保存されているデータに対してストリーム処理と分析を実行する機能が提供されます。

ストリーミング コンピューティングは一般にバッチ コンピューティングと比較されます。バッチ コンピューティングでは、多くの場合、固定データ セットを入力として受け取り、結果を計算します。ストリーミング コンピューティングの入力は多くの場合「無制限」(Unbounded Data) であり、これは連続的な入力です。つまり、計算のためにデータの全量を取得することはできません。同時に計算結果も連続的に出力され、最終結果ではなく、ある瞬間の結果のみが得られます。

Kafka Streams は、Kafka に保存されているデータを処理および分析するためのクライアント ライブラリです。これは、イベント時間と処理時間を区別する方法、ウィンドウ処理のサポート、シンプルで効率的な管理、アプリケーション ステータスのリアルタイム クエリなど、ストリーム処理のいくつかの重要な概念に基づいて構築されています。

Kafka Streams の敷居は非常に低く、通常の Kafka メッセージ処理プログラムの作成とほとんど変わらず、マルチプロセス展開による容量拡張、負荷分散、高可用性 (Kafka Consumer の並列モデル) を実現できます。

Kafka Streams の機能の一部:

  • あらゆるJavaアプリケーションに統合できるシンプルで軽量なクライアントライブラリとして設計されています
  • Kafka 以外の追加の依存関係はなく、水平拡張をサポートし、連続性を保証するために Kafka のパーティショニング モデルが使用されます。
  • フォールトトレラントな状態ストレージによる効率的なステートフル操作(ウィンドウ結合と集計)
  • 正確に1回だけのセマンティクスをサポート
  • レコードレベルの処理をサポートし、ミリ秒レベルのレイテンシを実現
  • 高レベルストリームDSLと低レベルプロセッサAPIを提供

ストリーム処理トポロジ

  • ストリームは、Kafka Streams によって提供される最も重要な抽象化であり、無制限で継続的に更新されるデータセットを表します。ストリームは、不変のデータ レコードの順序付けされた、再生可能でフォールト トレラントなシーケンスであり、データ レコードはキーと値のペアとして定義されます。
  • ストリーム処理アプリケーションは、Kafka Streams ライブラリを使用するアプリケーションです。これは、プロセッサ トポロジを通じて計算ロジックを定義します。各プロセッサ トポロジは、ストリームを介した複数のストリーム プロセッサ (ノード) のグラフです。
  • ストリーム プロセッサは、プロセッサ トポロジ内のノードです。これは、トポロジ内の上流プロセッサから一度に 1 つの入力レコードを受信し、そのレコードに操作を適用してストリーム内のデータを変換し、その後、下流プロセッサに 1 つ以上の出力レコードを生成する処理ステップを表します。

特別なプロセッサが 2 つあります。

ソース プロセッサは、アップストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。 1 つ以上の Kafka トピックからレコードを消費し、下流のプロセッサに転送することで、トポロジの入力ストリームを生成します。

シンク プロセッサ シンク プロセッサは、ダウンストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。上流プロセッサから受信したレコードを指定された kafka トピックに送信します。


関連するコアコンセプトについては、以下のリンクを参照してください。


以下はSpringbootでのKafka Streamの適用例です。

頼る

  1. <依存関係>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </依存関係>
  5. <依存関係>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>スプリングカフカ</artifactId>
  8. </依存関係>
  9. <依存関係>
  10. <グループ ID>org.apache.kafka</グループ ID>
  11. <artifactId>kafka-streams</artifactId>
  12. </依存関係>

構成

  1. サーバ:
  2. ポート: 9090
  3. 春:
  4. 応用:
  5. 名前: kafka-demo
  6. カフカ:
  7. ストリーム:
  8. アプリケーションID : ${spring.application.name }
  9. プロパティ:
  10. spring.json.trusted.packages: '*'  
  11. ブートストラップサーバー:
  12. - ローカルホスト:9092
  13. - ローカルホスト:9093
  14. - ローカルホスト:9094
  15. プロデューサー:
  16. 回答: 1
  17. 再試行: 10
  18. キー-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. 値シリアライザー: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
  20. プロパティ:
  21. spring.json.trusted.packages: '*'  
  22. 消費者:
  23. キー-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. 値デシリアライザー: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
  25. 自動コミットを有効にする: false  
  26. グループID: ConsumerTest
  27. 自動オフセットリセット: 最新
  28. プロパティ:
  29. セッションタイムアウト: 12000
  30. ハートビート間隔: 3000
  31. 最大投票レコード数: 100
  32. spring.json.trusted.packages: '*'  
  33. リスナー:
  34. ack-mode: 手動即時
  35. タイプ: バッチ
  36. 同時実行数: 8
  37. プロパティ:
  38. 最大ポーリング間隔(ミリ秒) : 300000

メッセージ送信

  1. @サービス
  2. パブリッククラスMessageSend {
  3. @リソース
  4. private KafkaTemplate<String, Message> kafkaTemplate;
  5. パブリックvoid sendMessage2(メッセージ メッセージ) {
  6. kafkaTemplate.send(新しい ProducerRecord<String, Message>( "test" 、 message)).addCallback(結果 -> {
  7. システム。 out .println( "実行が成功しました..." + Thread.currentThread().getName()) ;
  8. }, 例 -> {
  9. システム。 out .println( "実行に失敗しました" );
  10. 例:printStackTrace();
  11. });
  12. }
  13. }

メッセージ監視

  1. @KafkaListener(トピック = { "テスト" })
  2. パブリックvoid listener2(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  3. ( ConsumerRecord<String, Message> レコード: レコード) {
  4. システム。 out .println(this.getClass().hashCode() + ", スレッド" + Thread.currentThread().getName() + ", キー: " + record.key ( ) + ", 受信メッセージ: " + record.value() + ", パーティション: " + record.partition() + ", オフセット: " + record.offset());
  5. }
  6. 試す {
  7. TimeUnit.SECONDS.sleep(0);
  8. } キャッチ (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. ack.acknowledge() ;
  12. }
  13.      
  14. @KafkaListener(トピック = { "デモ" })
  15. パブリックvoid listenerDemo(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  16. ( ConsumerRecord<String, Message> レコード: レコード) {
  17. システム。 out .println( "デモトピック: " + this.getClass().hashCode() + "、スレッド" + Thread.currentThread().getName() + "、キー: " + record.key ( ) + "、受信メッセージ: " + record.value() + "、パーティション: " + record.partition() + "、オフセット: " + record.offset());
  18. }
  19. ack.acknowledge() ;
  20. }

Kafka ストリーム処理

メッセージの変換と他のトピックへの転送

  1. @ビーン
  2. パブリックKStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
  3. KStream<Object, Object> ストリーム = streamsBuilder.stream( "テスト" );
  4. stream.map((キー、値) -> {
  5. システム。 out .println( "元のメッセージの内容: " + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;
  6. return new KeyValue<>( key , "{\"title\": \"123123\", \"message\": \"コンテンツを再定義\"}" .getBytes(Charset.forName( "UTF-8" ))) ;
  7. })。 「デモ」
  8. ストリームを返します
  9. }

実行結果:


ストリームオブジェクトの処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream4(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.map((キー、値) -> {
  8. value.setTitle( "XXXXXXX" );
  9. 新しいKeyValue<>(キー、値)を返します
  10. })。 to ( "demo" (Serdes.String()、jsonSerde)を使用して生成されます) ;
  11. ストリームを返します
  12. }

実行結果:


グループ処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream5(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .count () 関数
  15. .toStream().print(Printed.toSysOut());
  16. ストリームを返します
  17. }

実行結果:


重合

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream6(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream().print(Printed.toSysOut());
  19. ストリームを返します
  20. }

実行結果:


フィルター データをフィルター

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream7(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream()
  19. .filter((キー, 値) -> ! "2" .equals(キー))
  20. .print(Printed.toSysOut());
  21. ストリームを返します
  22. }

実行結果:


フィルターキーが「2」と等しくありません

ブランチマルチストリーム処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream8(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. // ブランチ、マルチストリーム処理
  8. KStream<文字列、メッセージ>[] arrStream = stream.branch(
  9. (キー, 値 ) -> "男性" .equals(value.getSex()),
  10. (キー、値) -> "女性" .equals(value.getSex()));
  11. ストリーム。 ( arrStream ).forEach(-> {
  12. .foreach ((キー、メッセージ) -> {
  13. システム。出力.println(Thread.currentThread().getName() + ", キー = " +キー+ ", メッセージ = " + メッセージ);
  14. });
  15. });
  16. ストリームを返します
  17. }

実行結果:


複数フィールドのグループ化

複数のselectKeyを使用することはできません。後者は前者を上書きします。

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStreamM2(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. ストリーム
  8. .selectKey(新しいKeyValueMapper<文字列、メッセージ、文字列>() {
  9. @オーバーライド
  10. パブリック文字列適用(文字列キー、メッセージ値) {
  11. System.out .println (Thread.currentThread().getName());
  12. 戻り値.getTime() + " | " + value.getOrgCode() ;
  13. }
  14. })
  15. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  16. .count () 関数
  17. .toStream().print(Printed.toSysOut());
  18. ストリームを返します
  19. }

実行結果:


<<:  我が国の政府クラウド開発の現状と動向の分析

>>:  クラウドネイティブの進化のトレンドにおける従来のデータベースアップグレードの実践

推薦する

HTML5帝国の台頭に関する徹底分析:ワーテルローの後、離陸の準備は整った

1月22日ニュース(楊暁)インターネットが急速に発展し、マルチメディアアプリケーションが爆発的に増加...

LeYiCloud-香港VPS/クラウドホスト/サーバー/BGP+CN2/最大150Gbps防御

Leiyi Cloud は、香港データセンター + 高防御ホスト事業 (最大 150Gbps の D...

budgetvmはどうですか?シカゴデータセンターの専用サーバーの簡単な評価

budgetvm は独立サーバー事業を主に運営しており、各独立サーバーに 1Gbps の帯域幅と無制...

Baiduウェブマスタープラットフォームが「ウェブ検索ホワイトペーパー」を発表 袁芳さん、どう思いますか

現在、すべての主要なウェブマスタープラットフォームは、Baiduプラットフォームが「ウェブ検索ホワイ...

Tripodcloud: 無制限のトラフィック cn2 gia vps 年間支払いは月額 3.33 ドルから | IP 変更は無料

CN2 GIA ネットワーク上で無制限のトラフィックを備えた VPS として tripodcloud...

IOE を排除するための 9 年間の戦い: OceanBase がダークホースとして浮上した経緯を詳しく見る

過去 10 年間、クラウド コンピューティングによってもたらされた変化により、従来の IOE アーキ...

クラウド ネイティブとは実際には何を意味するのでしょうか?

多くの場合、クラウド ネイティブに関する会話では、コンテナ化やマイクロサービスなどのテクノロジの選択...

パンデミックがデスクトップ仮想化のトレンドを牽引している理由

在宅勤務モデルに移行する企業が増えるにつれ、デスクトップ仮想化 (VDI) の開発が促進されています...

ウェブサイトにキーワードランキングがない場合、どこからトラフィックを獲得できますか?

サイトの重みが増すほど、サイトのキーワードランキングが高くなり、検索エンジンからのトラフィックが増え...

分散トランザクションの簡単な分析と簡単な実装

分散システムでは、データの高可用性を確保するために、通常、データの複数のコピー (レプリカ) を保持...

SEO品質の外部リンク:リンクを取得するのが難しいほど、価値が高くなります

みなさんこんにちは。Smelling the Roseです。 SEO 最適化を行うほとんどの人は、外...

iPaaS·EasyFunctionは、多言語サポートとイベント駆動型を備えた完全に管理されたサーバーレス機能コンピューティングサービスプラットフォームです。

著者|プラットフォーム事業研究開発部・応用事業製品研究開発部 バベルチームの朱志国、王志群、趙漢、戴...

コンテナの利点はソフトウェアベースのネットワークに浸透している

コンテナは、IT およびネットワーク アプリケーション開発に大きな影響を与える新しいテクノロジーです...

ミニプログラムとARが出会うと、ブラックテクノロジーマーケティングもこのように展開される

月収10万元の起業の夢を実現するミニプログラム起業支援プラン[Lieyun.com(WeChat:i...

従来のストレージと分散ストレージの対立

1. 従来のストレージシステムの過去と現在1. 途中のストレージハードウェア従来のストレージ システ...