SpringbootはKafka Streamのリアルタイム統計を統合します

SpringbootはKafka Streamのリアルタイム統計を統合します

[[417927]]

環境: springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka ストリームの紹介

Kafka はバージョン 0.10 で Stream API をリリースしました。これにより、Kafka に保存されているデータに対してストリーム処理と分析を実行する機能が提供されます。

ストリーミング コンピューティングは一般にバッチ コンピューティングと比較されます。バッチ コンピューティングでは、多くの場合、固定データ セットを入力として受け取り、結果を計算します。ストリーミング コンピューティングの入力は多くの場合「無制限」(Unbounded Data) であり、これは連続的な入力です。つまり、計算のためにデータの全量を取得することはできません。同時に計算結果も連続的に出力され、最終結果ではなく、ある瞬間の結果のみが得られます。

Kafka Streams は、Kafka に保存されているデータを処理および分析するためのクライアント ライブラリです。これは、イベント時間と処理時間を区別する方法、ウィンドウ処理のサポート、シンプルで効率的な管理、アプリケーション ステータスのリアルタイム クエリなど、ストリーム処理のいくつかの重要な概念に基づいて構築されています。

Kafka Streams の敷居は非常に低く、通常の Kafka メッセージ処理プログラムの作成とほとんど変わらず、マルチプロセス展開による容量拡張、負荷分散、高可用性 (Kafka Consumer の並列モデル) を実現できます。

Kafka Streams の機能の一部:

  • あらゆるJavaアプリケーションに統合できるシンプルで軽量なクライアントライブラリとして設計されています
  • Kafka 以外の追加の依存関係はなく、水平拡張をサポートし、連続性を保証するために Kafka のパーティショニング モデルが使用されます。
  • フォールトトレラントな状態ストレージによる効率的なステートフル操作(ウィンドウ結合と集計)
  • 正確に1回だけのセマンティクスをサポート
  • レコードレベルの処理をサポートし、ミリ秒レベルのレイテンシを実現
  • 高レベルストリームDSLと低レベルプロセッサAPIを提供

ストリーム処理トポロジ

  • ストリームは、Kafka Streams によって提供される最も重要な抽象化であり、無制限で継続的に更新されるデータセットを表します。ストリームは、不変のデータ レコードの順序付けされた、再生可能でフォールト トレラントなシーケンスであり、データ レコードはキーと値のペアとして定義されます。
  • ストリーム処理アプリケーションは、Kafka Streams ライブラリを使用するアプリケーションです。これは、プロセッサ トポロジを通じて計算ロジックを定義します。各プロセッサ トポロジは、ストリームを介した複数のストリーム プロセッサ (ノード) のグラフです。
  • ストリーム プロセッサは、プロセッサ トポロジ内のノードです。これは、トポロジ内の上流プロセッサから一度に 1 つの入力レコードを受信し、そのレコードに操作を適用してストリーム内のデータを変換し、その後、下流プロセッサに 1 つ以上の出力レコードを生成する処理ステップを表します。

特別なプロセッサが 2 つあります。

ソース プロセッサは、アップストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。 1 つ以上の Kafka トピックからレコードを消費し、下流のプロセッサに転送することで、トポロジの入力ストリームを生成します。

シンク プロセッサ シンク プロセッサは、ダウンストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。上流プロセッサから受信したレコードを指定された kafka トピックに送信します。


関連するコアコンセプトについては、以下のリンクを参照してください。


以下はSpringbootでのKafka Streamの適用例です。

頼る

  1. <依存関係>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </依存関係>
  5. <依存関係>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>スプリングカフカ</artifactId>
  8. </依存関係>
  9. <依存関係>
  10. <グループ ID>org.apache.kafka</グループ ID>
  11. <artifactId>kafka-streams</artifactId>
  12. </依存関係>

構成

  1. サーバ:
  2. ポート: 9090
  3. 春:
  4. 応用:
  5. 名前: kafka-demo
  6. カフカ:
  7. ストリーム:
  8. アプリケーションID : ${spring.application.name }
  9. プロパティ:
  10. spring.json.trusted.packages: '*'  
  11. ブートストラップサーバー:
  12. - ローカルホスト:9092
  13. - ローカルホスト:9093
  14. - ローカルホスト:9094
  15. プロデューサー:
  16. 回答: 1
  17. 再試行: 10
  18. キー-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. 値シリアライザー: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
  20. プロパティ:
  21. spring.json.trusted.packages: '*'  
  22. 消費者:
  23. キー-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  24. 値デシリアライザー: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
  25. 自動コミットを有効にする: false  
  26. グループID: ConsumerTest
  27. 自動オフセットリセット: 最新
  28. プロパティ:
  29. セッションタイムアウト: 12000
  30. ハートビート間隔: 3000
  31. 最大投票レコード数: 100
  32. spring.json.trusted.packages: '*'  
  33. リスナー:
  34. ack-mode: 手動即時
  35. タイプ: バッチ
  36. 同時実行数: 8
  37. プロパティ:
  38. 最大ポーリング間隔(ミリ秒) : 300000

メッセージ送信

  1. @サービス
  2. パブリッククラスMessageSend {
  3. @リソース
  4. private KafkaTemplate<String, Message> kafkaTemplate;
  5. パブリックvoid sendMessage2(メッセージ メッセージ) {
  6. kafkaTemplate.send(新しい ProducerRecord<String, Message>( "test" 、 message)).addCallback(結果 -> {
  7. システム。 out .println( "実行が成功しました..." + Thread.currentThread().getName()) ;
  8. }, 例 -> {
  9. システム。 out .println( "実行に失敗しました" );
  10. 例:printStackTrace();
  11. });
  12. }
  13. }

メッセージ監視

  1. @KafkaListener(トピック = { "テスト" })
  2. パブリックvoid listener2(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  3. ( ConsumerRecord<String, Message> レコード: レコード) {
  4. システム。 out .println(this.getClass().hashCode() + ", スレッド" + Thread.currentThread().getName() + ", キー: " + record.key ( ) + ", 受信メッセージ: " + record.value() + ", パーティション: " + record.partition() + ", オフセット: " + record.offset());
  5. }
  6. 試す {
  7. TimeUnit.SECONDS.sleep(0);
  8. } キャッチ (InterruptedException e) {
  9. e.printStackTrace();
  10. }
  11. ack.acknowledge() ;
  12. }
  13.      
  14. @KafkaListener(トピック = { "デモ" })
  15. パブリックvoid listenerDemo(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
  16. ( ConsumerRecord<String, Message> レコード: レコード) {
  17. システム。 out .println( "デモトピック: " + this.getClass().hashCode() + "、スレッド" + Thread.currentThread().getName() + "、キー: " + record.key ( ) + "、受信メッセージ: " + record.value() + "、パーティション: " + record.partition() + "、オフセット: " + record.offset());
  18. }
  19. ack.acknowledge() ;
  20. }

Kafka ストリーム処理

メッセージの変換と他のトピックへの転送

  1. @ビーン
  2. パブリックKStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
  3. KStream<Object, Object> ストリーム = streamsBuilder.stream( "テスト" );
  4. stream.map((キー、値) -> {
  5. システム。 out .println( "元のメッセージの内容: " + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;
  6. return new KeyValue<>( key , "{\"title\": \"123123\", \"message\": \"コンテンツを再定義\"}" .getBytes(Charset.forName( "UTF-8" ))) ;
  7. })。 「デモ」
  8. ストリームを返します
  9. }

実行結果:


ストリームオブジェクトの処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream4(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.map((キー、値) -> {
  8. value.setTitle( "XXXXXXX" );
  9. 新しいKeyValue<>(キー、値)を返します
  10. })。 to ( "demo" (Serdes.String()、jsonSerde)を使用して生成されます) ;
  11. ストリームを返します
  12. }

実行結果:


グループ処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream5(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .count () 関数
  15. .toStream().print(Printed.toSysOut());
  16. ストリームを返します
  17. }

実行結果:


重合

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream6(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream().print(Printed.toSysOut());
  19. ストリームを返します
  20. }

実行結果:


フィルター データをフィルター

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream7(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
  8. @オーバーライド
  9. パブリック文字列適用(文字列キー、メッセージ値) {
  10. 戻り値.getOrgCode();
  11. }
  12. })
  13. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  14. .aggregate(() -> 0L, (キー、値、aggValue) -> {
  15. システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
  16. aggValue + 1を返します
  17. }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
  18. .toStream()
  19. .filter((キー, 値) -> ! "2" .equals(キー))
  20. .print(Printed.toSysOut());
  21. ストリームを返します
  22. }

実行結果:


フィルターキーが「2」と等しくありません

ブランチマルチストリーム処理

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStream8(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. // ブランチ、マルチストリーム処理
  8. KStream<文字列、メッセージ>[] arrStream = stream.branch(
  9. (キー, 値 ) -> "男性" .equals(value.getSex()),
  10. (キー、値) -> "女性" .equals(value.getSex()));
  11. ストリーム。 ( arrStream ).forEach(-> {
  12. .foreach ((キー、メッセージ) -> {
  13. システム。出力.println(Thread.currentThread().getName() + ", キー = " +キー+ ", メッセージ = " + メッセージ);
  14. });
  15. });
  16. ストリームを返します
  17. }

実行結果:


複数フィールドのグループ化

複数のselectKeyを使用することはできません。後者は前者を上書きします。

  1. @ビーン
  2. パブリックKStream<文字列、メッセージ> kStreamM2(StreamsBuilder streamsBuilder) {
  3. JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
  4. JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
  5. descri.addTrustedPackages( "*" );
  6. KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
  7. ストリーム
  8. .selectKey(新しいKeyValueMapper<文字列、メッセージ、文字列>() {
  9. @オーバーライド
  10. パブリック文字列適用(文字列キー、メッセージ値) {
  11. System.out .println (Thread.currentThread().getName());
  12. 戻り値.getTime() + " | " + value.getOrgCode() ;
  13. }
  14. })
  15. .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
  16. .count () 関数
  17. .toStream().print(Printed.toSysOut());
  18. ストリームを返します
  19. }

実行結果:


<<:  我が国の政府クラウド開発の現状と動向の分析

>>:  クラウドネイティブの進化のトレンドにおける従来のデータベースアップグレードの実践

推薦する

NiuBo.comは数年間沈黙していたが、Tmallになった。羅永浩はWanwangの無策を激しく批判した。

2012年7月14日午後3時30分頃、NiuBo.comの創設者であるLuo Yonghao氏がWe...

年末レビュー: 2023 年最大のクラウド障害 15 件

重要なビジネスプロセスを実行する上でクラウド プラットフォーム テクノロジーがますます重要になるにつ...

中小企業にとってSEOが効果がない理由の分析

「SEOプロモーションを実施したのですが、結果が期待通りでなく、上司も認めてくれませんでした。」ホー...

「熊張豪 SEOガイド1.0」簡易版!

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています最近、百度...

Baidu SEO 最適化スキルを活用して Taobao で上位にランクインする方法

タオバオの大手は、タオバオのランキングは百度よりもはるかに複雑だと言っていました。私はそれを信じなか...

ハイブリッドクラウドの時代において、Unicloudは企業にとって「クラウド」の管理を容易にします

[51CTO.com からのオリジナル記事] クラウド コンピューティングがますます多くの業界で導入...

catalysthost-年間 12 ドル/128 MB RAM/10 GB ハード ドライブ/10 TB トラフィック/2 GB ポート

catalysthost.com は、年にほとんどプロモーションを行いません。現在、ダラス データ ...

NodeServ – 年間 30 ドル / 500g DDOS 保護 / 256M メモリ / 50G ハードディスク / 500G トラフィック

NodeServ.com は、DDOS 保護を備えた VPS の提供を開始したと発表しました。ネット...

デザイン思考: ユーザーのニーズを満たすことだけを考えるのではなく

[編集者注] この記事の著者は @一只土贼 です。デザイン思考はデザインコンセプトです。その焦点はも...

pumpcloud: マカオ VPS の再入荷、マカオ テレコム、500Mbps 帯域幅、超大規模トラフィック、市場では珍しい

マカオ VPS は VPS 市場で入手するのが非常に難しく、リソースが不足し、高価です。ここでは、p...

ハイブリッドクラウドワークを導入するために必要な 5 つのスキル

パンデミックによって私たちの働き方、時間、場所が再定義された 2020 年初頭には、ハイブリッド ク...

フォレスターの最新レポート:アリババクラウドが初めてグローバルコンテナリーダークアドラントにランクイン

1 月 21 日、国際的に権威のあるコンサルティング組織である Forrester が、グローバル ...

Baidu 製品を使用してウェブサイトの重みと評判を向上させることについて説明します

実際、多くの友人のサイトのトラフィックは主に百度に依存していることは誰もが知っているので、百度でのラ...

#無制限トラフィックサーバー# sharktech-$89/X3470/12gメモリ/1.5Tハードディスク/1Gbps/デンバー

2003年から運営されているSharktechは、新年から特別低価格サーバーの提供を開始しました。1...