SpringbootはKafka Streamのリアルタイム統計を統合します

SpringbootはKafka Streamのリアルタイム統計を統合します

[[417927]]

環境: springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka ストリームの紹介

Kafka はバージョン 0.10 で Stream API をリリースしました。これにより、Kafka に保存されているデータに対してストリーム処理と分析を実行する機能が提供されます。

ストリーミング コンピューティングは一般にバッチ コンピューティングと比較されます。バッチ コンピューティングでは、多くの場合、固定データ セットを入力として受け取り、結果を計算します。ストリーミング コンピューティングの入力は多くの場合「無制限」(Unbounded Data) であり、これは連続的な入力です。つまり、計算のためにデータの全量を取得することはできません。同時に計算結果も連続的に出力され、最終結果ではなく、ある瞬間の結果のみが得られます。

Kafka Streams は、Kafka に保存されているデータを処理および分析するためのクライアント ライブラリです。これは、イベント時間と処理時間を区別する方法、ウィンドウ処理のサポート、シンプルで効率的な管理、アプリケーション ステータスのリアルタイム クエリなど、ストリーム処理のいくつかの重要な概念に基づいて構築されています。

Kafka Streams の敷居は非常に低く、通常の Kafka メッセージ処理プログラムの作成とほとんど変わらず、マルチプロセス展開による容量拡張、負荷分散、高可用性 (Kafka Consumer の並列モデル) を実現できます。

Kafka Streams の機能の一部:

  • あらゆるJavaアプリケーションに統合できるシンプルで軽量なクライアントライブラリとして設計されています
  • Kafka 以外の追加の依存関係はなく、水平拡張をサポートし、連続性を保証するために Kafka のパーティショニング モデルが使用されます。
  • フォールトトレラントな状態ストレージによる効率的なステートフル操作(ウィンドウ結合と集計)
  • 正確に1回だけのセマンティクスをサポート
  • レコードレベルの処理をサポートし、ミリ秒レベルのレイテンシを実現
  • 高レベルストリームDSLと低レベルプロセッサAPIを提供

ストリーム処理トポロジ

  • ストリームは、Kafka Streams によって提供される最も重要な抽象化であり、無制限で継続的に更新されるデータセットを表します。ストリームは、不変のデータ レコードの順序付けされた、再生可能でフォールト トレラントなシーケンスであり、データ レコードはキーと値のペアとして定義されます。
  • ストリーム処理アプリケーションは、Kafka Streams ライブラリを使用するアプリケーションです。これは、プロセッサ トポロジを通じて計算ロジックを定義します。各プロセッサ トポロジは、ストリームを介した複数のストリーム プロセッサ (ノード) のグラフです。
  • ストリーム プロセッサは、プロセッサ トポロジ内のノードです。これは、トポロジ内の上流プロセッサから一度に 1 つの入力レコードを受信し、そのレコードに操作を適用してストリーム内のデータを変換し、その後、下流プロセッサに 1 つ以上の出力レコードを生成する処理ステップを表します。

特別なプロセッサが 2 つあります。

ソース プロセッサは、アップストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。 1 つ以上の Kafka トピックからレコードを消費し、下流のプロセッサに転送することで、トポロジの入力ストリームを生成します。

シンク プロセッサ シンク プロセッサは、ダウンストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。上流プロセッサから受信したレコードを指定された kafka トピックに送信します。


関連するコアコンセプトについては、以下のリンクを参照してください。


以下はSpringbootでのKafka Streamの適用例です。

頼る

  1. <依存関係>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </依存関係>
  5. <依存関係>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>スプリングカフカ</artifactId>
  8. </依存関係>
  9. <依存関係>
  10. <グループ ID>org.apache.kafka</グループ ID>
  11. <artifactId>kafka-streams</artifactId>
  12. </依存関係>

構成

  1. サーバ:
  2. ポート: 9090
  3. 春:
  4. 応用:
  5. 名前: kafka-demo
  6. カフカ:
  7. ストリーム:
  8. アプリケーションID : ${spring.application.name }
  9. プロパティ:
  10. spring.json.trusted.packages: '*'  
  11. ブートストラップサーバー:
  12. - ローカルホスト:9092
  13. - ローカルホスト:9093
  14. - ローカルホスト:9094
  15. プロデューサー:
  16. 回答: 1
  17. 再試行: 10
  18. キー-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. 値シリアライザー: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
  20. プロパティ:
  21. spring.json.trusted.packages: '*'  
  22. 消費者:
  23. キー-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. 値デシリアライザー: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
  25. 自動コミットを有効にする: false  
  26. グループID: ConsumerTest
  27. 自動オフセットリセット: 最新
  28. プロパティ:
  29. セッションタイムアウト: 12000
  30. ハートビート間隔: 3000
  31. 最大投票レコード数: 100
  32. spring.json.trusted.packages: '*'  
  33. リスナー:
  34. ack-mode: 手動即時
  35. タイプ: バッチ
  36. 同時実行数: 8
  37. プロパティ:
  38. 最大ポーリング間隔(ミリ秒) : 300000

メッセージ送信

  1. @サービス
  2. パブリッククラスMessageSend {
  3. @リソース
  4. private KafkaTemplate<String, Message> kafkaTemplate;
  5. パブリックvoid sendMessage2(メッセージ メッセージ) {
  6. kafkaTemplate.send(新しい ProducerRecord<String, Message>( "test" 、 message)).addCallback(結果 -> {
  7. システム。 out .println( "実行が成功しました..." + Thread.currentThread().getName()) ;
  8. }, 例 -> {
  9. システム。 out .println( "実行に失敗しました" );
  10. 例:printStackTrace();
  11. });
  12. }
  13. }

メッセージ監視

  1. @KafkaListener(トピック = { "テスト" })
  2. パブリックvoid listener2(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  3. ( ConsumerRecord<String, Message> レコード: レコード) {
  4. システム。 out .println(this.getClass().hashCode() + ", スレッド" + Thread.currentThread().getName() + ", キー: " + record.key ( ) + ", 受信メッセージ: " + record.value() + ", パーティション: " + record.partition() + ", オフセット: " + record.offset());
  5. }
  6. 試す {
  7. TimeUnit.SECONDS.sleep(0);
  8. } キャッチ (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. ack.acknowledge() ;
  12. }
  13.      
  14. @KafkaListener(トピック = { "デモ" })
  15. パブリックvoid listenerDemo(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  16. ( ConsumerRecord<String, Message> レコード: レコード) {
  17. システム。 out .println( "デモトピック: " + this.getClass().hashCode() + "、スレッド" + Thread.currentThread().getName() + "、キー: " + record.key ( ) + "、受信メッセージ: " + record.value() + "、パーティション: " + record.partition() + "、オフセット: " + record.offset());
  18. }
  19. ack.acknowledge() ;
  20. }

Kafka ストリーム処理

メッセージの変換と他のトピックへの転送

  1. @ビーン
  2. パブリックKStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
  3. KStream<Object, Object> ストリーム = streamsBuilder.stream( "テスト" );
  4. stream.map((キー、値) -> {
  5. システム。 out .println( "元のメッセージの内容: " + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;
  6. return new KeyValue<>( key , "{\"title\": \"123123\", \"message\": \"コンテンツを再定義\"}" .getBytes(Charset.forName( "UTF-8" ))) ;
  7. })。 「デモ」
  8. ストリームを返します
  9. }

実行結果:


ストリームオブジェクトの処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream4(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.map((キー、値) -> {
  8. value.setTitle( "XXXXXXX" );
  9. 新しいKeyValue<>(キー、値)を返します
  10. })。 to ( "demo" (Serdes.String()、jsonSerde)を使用して生成されます) ;
  11. ストリームを返します
  12. }

実行結果:


グループ処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream5(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .count () 関数
  15. .toStream().print(Printed.toSysOut());
  16. ストリームを返します
  17. }

実行結果:


重合

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream6(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream().print(Printed.toSysOut());
  19. ストリームを返します
  20. }

実行結果:


フィルター データをフィルター

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream7(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream()
  19. .filter((キー, 値) -> ! "2" .equals(キー))
  20. .print(Printed.toSysOut());
  21. ストリームを返します
  22. }

実行結果:


フィルターキーが「2」と等しくありません

ブランチマルチストリーム処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream8(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. // ブランチ、マルチストリーム処理
  8. KStream<文字列、メッセージ>[] arrStream = stream.branch(
  9. (キー, 値 ) -> "男性" .equals(value.getSex()),
  10. (キー、値) -> "女性" .equals(value.getSex()));
  11. ストリーム。 ( arrStream ).forEach(-> {
  12. .foreach ((キー、メッセージ) -> {
  13. システム。出力.println(Thread.currentThread().getName() + ", キー = " +キー+ ", メッセージ = " + メッセージ);
  14. });
  15. });
  16. ストリームを返します
  17. }

実行結果:


複数フィールドのグループ化

複数のselectKeyを使用することはできません。後者は前者を上書きします。

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStreamM2(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. ストリーム
  8. .selectKey(新しいKeyValueMapper<文字列、メッセージ、文字列>() {
  9. @オーバーライド
  10. パブリック文字列適用(文字列キー、メッセージ値) {
  11. System.out .println (Thread.currentThread().getName());
  12. 戻り値.getTime() + " | " + value.getOrgCode() ;
  13. }
  14. })
  15. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  16. .count () 関数
  17. .toStream().print(Printed.toSysOut());
  18. ストリームを返します
  19. }

実行結果:


<<:  我が国の政府クラウド開発の現状と動向の分析

>>:  クラウドネイティブの進化のトレンドにおける従来のデータベースアップグレードの実践

推薦する

Robots.txt プロトコル標準の概要

最近、多くのウェブマスターから「robots.txt」ファイルを正しく設定する方法について質問を受け...

温かみのあるテクノロジーがデジタル中国の未来を描く、景東が清明節にアリペイ版「河畔」を披露

9月20日、技術専門家が集まったATECメインフォーラムで、何千年もの間旅をしてきた古代の絵画が「生...

偽造品や偽購入代理店と戦うWeChatモーメント「オペレーション・サンダー」

テンセントは5月16日、最近新たな偽造品対策作戦を開始し、特にWeChatパブリックアカウントとモー...

推奨: lfchosting-$6.5/月/Xen/4 コア/1g メモリ/15SSD/1T トラフィック

4年後、lfchostingはついにトレンドに従い、新しいSSDハードドライブVPSを導入し、いくつ...

Baiduアルゴリズム調整後のウェブサイトのオリジナルコンテンツと転載率を調整する方法

ウェブサイトのコンテンツの更新は時間のかかる作業です。すべてのコンテンツをオリジナルにすることは不可...

オリジナルコンテンツ:大学におけるWeChatパブリックアカウントのチーム構築、運用、プロモーション

今日は、大学のパブリックアカウントのチーム構築、運用、プロモーション戦略を分析します。大学生起業家の...

ブランドマーケティング統合パス

モバイルインターネットの徹底的な発展とアルゴリズムおよびビッグデータの精度の幾何学的成長により、通信...

Baidu による旅行ウェブサイトのプロモーションに関する経験と洞察

2012年に旅行会社にSEOスーパーバイザーとして入社してから1年以上が経ちました。いろいろな思いが...

香港沙田CN2: pzea-9 USD/KVM/windows/1G RAM/20G HDD/無制限トラフィック

pzea.com の香港沙田データセンター VPS 補充。すべてのマシンはセルフホストされ、CN2 ...

狂犬にならないで! APについて知っておくべきこと

AP をプッシュすることは、オンラインでお金を稼ぐほとんどの人が最初に行うことです。知識が少なすぎる...

毎日の話題:モバイルインターネットの波でシャンダの「伝説」時代は終わった

A5ウェブマスターネットワーク(www.admin5.com)は4月18日、かつてのインターネット大...

オープンソースのリスクと課題を正しく検証し、冷静にクラウドに移行するにはどうすればよいでしょうか。

[51CTO.comからのオリジナル記事] 近年、オープンソースの概念は中国のビジネス界や開発者に広...

ゲーム開発経験の概要: 分散アーキテクチャ、データベース、プロセス設計

ゲームをレーシングカーと考えると、ゲーム開発はエンジンとして重要な役割を果たし、プロット、レベル、リ...

トレンド |仮想化は負け、クラウド、SDN、SD-WAN が C の位置を獲得しました。

企業のネットワーク管理者にとって、IT の最大の焦点はクラウド コンピューティングとソフトウェア定義...