[[417927]] 環境: springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2 Kafka ストリームの紹介Kafka はバージョン 0.10 で Stream API をリリースしました。これにより、Kafka に保存されているデータに対してストリーム処理と分析を実行する機能が提供されます。 ストリーミング コンピューティングは一般にバッチ コンピューティングと比較されます。バッチ コンピューティングでは、多くの場合、固定データ セットを入力として受け取り、結果を計算します。ストリーミング コンピューティングの入力は多くの場合「無制限」(Unbounded Data) であり、これは連続的な入力です。つまり、計算のためにデータの全量を取得することはできません。同時に計算結果も連続的に出力され、最終結果ではなく、ある瞬間の結果のみが得られます。 Kafka Streams は、Kafka に保存されているデータを処理および分析するためのクライアント ライブラリです。これは、イベント時間と処理時間を区別する方法、ウィンドウ処理のサポート、シンプルで効率的な管理、アプリケーション ステータスのリアルタイム クエリなど、ストリーム処理のいくつかの重要な概念に基づいて構築されています。 Kafka Streams の敷居は非常に低く、通常の Kafka メッセージ処理プログラムの作成とほとんど変わらず、マルチプロセス展開による容量拡張、負荷分散、高可用性 (Kafka Consumer の並列モデル) を実現できます。 Kafka Streams の機能の一部: - あらゆるJavaアプリケーションに統合できるシンプルで軽量なクライアントライブラリとして設計されています
- Kafka 以外の追加の依存関係はなく、水平拡張をサポートし、連続性を保証するために Kafka のパーティショニング モデルが使用されます。
- フォールトトレラントな状態ストレージによる効率的なステートフル操作(ウィンドウ結合と集計)
- 正確に1回だけのセマンティクスをサポート
- レコードレベルの処理をサポートし、ミリ秒レベルのレイテンシを実現
- 高レベルストリームDSLと低レベルプロセッサAPIを提供
ストリーム処理トポロジ- ストリームは、Kafka Streams によって提供される最も重要な抽象化であり、無制限で継続的に更新されるデータセットを表します。ストリームは、不変のデータ レコードの順序付けされた、再生可能でフォールト トレラントなシーケンスであり、データ レコードはキーと値のペアとして定義されます。
- ストリーム処理アプリケーションは、Kafka Streams ライブラリを使用するアプリケーションです。これは、プロセッサ トポロジを通じて計算ロジックを定義します。各プロセッサ トポロジは、ストリームを介した複数のストリーム プロセッサ (ノード) のグラフです。
- ストリーム プロセッサは、プロセッサ トポロジ内のノードです。これは、トポロジ内の上流プロセッサから一度に 1 つの入力レコードを受信し、そのレコードに操作を適用してストリーム内のデータを変換し、その後、下流プロセッサに 1 つ以上の出力レコードを生成する処理ステップを表します。
特別なプロセッサが 2 つあります。 ソース プロセッサは、アップストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。 1 つ以上の Kafka トピックからレコードを消費し、下流のプロセッサに転送することで、トポロジの入力ストリームを生成します。 シンク プロセッサ シンク プロセッサは、ダウンストリーム プロセッサを持たない特殊なタイプのストリーム プロセッサです。上流プロセッサから受信したレコードを指定された kafka トピックに送信します。
関連するコアコンセプトについては、以下のリンクを参照してください。
以下はSpringbootでのKafka Streamの適用例です。 頼る
- <依存関係>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </依存関係>
- <依存関係>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>スプリングカフカ</artifactId>
- </依存関係>
- <依存関係>
- <グループ ID>org.apache.kafka</グループ ID>
- <artifactId>kafka-streams</artifactId>
- </依存関係>
構成- サーバ:
- ポート: 9090
- 春:
- 応用:
- 名前: kafka-demo
- カフカ:
- ストリーム:
- アプリケーションID : ${spring.application.name }
- プロパティ:
- spring.json.trusted.packages: '*'
- ブートストラップサーバー:
- - ローカルホスト:9092
- - ローカルホスト:9093
- - ローカルホスト:9094
- プロデューサー:
- 回答: 1
- 再試行: 10
- キー-serializer: org.apache.kafka.common.serialization.StringSerializer
- 値シリアライザー: org.springframework.kafka.support.serializer.JsonSerializer #org.apache.kafka.common.serialization.StringSerializer
- プロパティ:
- spring.json.trusted.packages: '*'
- 消費者:
- キー-deserializer: org.apache.kafka.common.serialization.StringDeserializer
- 値デシリアライザー: org.springframework.kafka.support.serializer.JsonDeserializer #org.apache.kafka.common.serialization.StringDeserializer
- 自動コミットを有効にする: false
- グループID: ConsumerTest
- 自動オフセットリセット: 最新
- プロパティ:
- セッションタイムアウト: 12000
- ハートビート間隔: 3000
- 最大投票レコード数: 100
- spring.json.trusted.packages: '*'
- リスナー:
- ack-mode: 手動即時
- タイプ: バッチ
- 同時実行数: 8
- プロパティ:
- 最大ポーリング間隔(ミリ秒) : 300000
メッセージ送信- @サービス
- パブリッククラスMessageSend {
- @リソース
- private KafkaTemplate<String, Message> kafkaTemplate;
- パブリックvoid sendMessage2(メッセージ メッセージ) {
- kafkaTemplate.send(新しい ProducerRecord<String, Message>( "test" 、 message)).addCallback(結果 -> {
- システム。 out .println( "実行が成功しました..." + Thread.currentThread().getName()) ;
- }, 例 -> {
- システム。 out .println( "実行に失敗しました" );
- 例:printStackTrace();
- });
- }
- }
メッセージ監視- @KafkaListener(トピック = { "テスト" })
- パブリックvoid listener2(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
- ( ConsumerRecord<String, Message> レコード: レコード) {
- システム。 out .println(this.getClass().hashCode() + ", スレッド" + Thread.currentThread().getName() + ", キー: " + record.key ( ) + ", 受信メッセージ: " + record.value() + ", パーティション: " + record.partition() + ", オフセット: " + record.offset());
- }
- 試す {
- TimeUnit.SECONDS.sleep(0);
- } キャッチ (InterruptedException e) {
- e.printStackTrace();
- }
- ack.acknowledge() ;
- }
-
- @KafkaListener(トピック = { "デモ" })
- パブリックvoid listenerDemo(List<ConsumerRecord<String, Message>> records, 確認応答 ack) {
- ( ConsumerRecord<String, Message> レコード: レコード) {
- システム。 out .println( "デモトピック: " + this.getClass().hashCode() + "、スレッド" + Thread.currentThread().getName() + "、キー: " + record.key ( ) + "、受信メッセージ: " + record.value() + "、パーティション: " + record.partition() + "、オフセット: " + record.offset());
- }
- ack.acknowledge() ;
- }
Kafka ストリーム処理メッセージの変換と他のトピックへの転送 - @ビーン
- パブリックKStream<Object, Object> kStream(StreamsBuilder streamsBuilder) {
- KStream<Object, Object> ストリーム = streamsBuilder.stream( "テスト" );
- stream.map((キー、値) -> {
- システム。 out .println( "元のメッセージの内容: " + new String((byte[]) value, Charset.forName( "UTF-8" ))) ;
- return new KeyValue<>( key , "{\"title\": \"123123\", \"message\": \"コンテンツを再定義\"}" .getBytes(Charset.forName( "UTF-8" ))) ;
- })。 ( 「デモ」 )に
- ストリームを返します。
- }
実行結果:
ストリームオブジェクトの処理 - @ビーン
- パブリックKStream<文字列、メッセージ> kStream4(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- stream.map((キー、値) -> {
- value.setTitle( "XXXXXXX" );
- 新しいKeyValue<>(キー、値)を返します。
- })。 to ( "demo" 、 (Serdes.String()、jsonSerde)を使用して生成されます) ;
- ストリームを返します。
- }
実行結果:
グループ処理 - @ビーン
- パブリックKStream<文字列、メッセージ> kStream5(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
- @オーバーライド
- パブリック文字列適用(文字列キー、メッセージ値) {
- 戻り値.getOrgCode();
- }
- })
- .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
- .count () 関数
- .toStream().print(Printed.toSysOut());
- ストリームを返します。
- }
実行結果:
重合 - @ビーン
- パブリックKStream<文字列、メッセージ> kStream6(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
- @オーバーライド
- パブリック文字列適用(文字列キー、メッセージ値) {
- 戻り値.getOrgCode();
- }
- })
- .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
- .aggregate(() -> 0L, (キー、値、aggValue) -> {
- システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
- aggValue + 1を返します。
- }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
- .toStream().print(Printed.toSysOut());
- ストリームを返します。
- }
実行結果:
フィルター データをフィルター - @ビーン
- パブリックKStream<文字列、メッセージ> kStream7(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- stream.selectKey(新しいKeyValueMapper<String, Message, String>() {
- @オーバーライド
- パブリック文字列適用(文字列キー、メッセージ値) {
- 戻り値.getOrgCode();
- }
- })
- .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
- .aggregate(() -> 0L, (キー、値、aggValue) -> {
- システム。出力.println( "キー = " +キー+ "、値 = " + 値 + "、agg = " + aggValue);
- aggValue + 1を返します。
- }, マテリアライズド。<String, Long, KeyValueStore<Bytes,byte[]>> ( " kvs " ).withValueSerde(Serdes.Long()))
- .toStream()
- .filter((キー, 値) -> ! "2" .equals(キー))
- .print(Printed.toSysOut());
- ストリームを返します。
- }
実行結果:
フィルターキーが「2」と等しくありません ブランチマルチストリーム処理 - @ビーン
- パブリックKStream<文字列、メッセージ> kStream8(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- // ブランチ、マルチストリーム処理
- KStream<文字列、メッセージ>[] arrStream = stream.branch(
- (キー, 値 ) -> "男性" .equals(value.getSex()),
- (キー、値) -> "女性" .equals(value.getSex()));
- ストリーム。 ( arrStream ).forEach(の-> {
- .foreach ((キー、メッセージ) -> {
- システム。出力.println(Thread.currentThread().getName() + ", キー = " +キー+ ", メッセージ = " + メッセージ);
- });
- });
- ストリームを返します。
- }
実行結果:
複数フィールドのグループ化 複数のselectKeyを使用することはできません。後者は前者を上書きします。 - @ビーン
- パブリックKStream<文字列、メッセージ> kStreamM2(StreamsBuilder streamsBuilder) {
- JsonSerde<メッセージ> jsonSerde = new JsonSerde<>();
- JsonDeserializer<メッセージ> descri = (JsonDeserializer<メッセージ>) jsonSerde.deserializer();
- descri.addTrustedPackages( "*" );
- KStream<String, Message> stream = streamsBuilder.stream( "test" 、Consumed. with (Serdes.String(), jsonSerde));
- ストリーム
- .selectKey(新しいKeyValueMapper<文字列、メッセージ、文字列>() {
- @オーバーライド
- パブリック文字列適用(文字列キー、メッセージ値) {
- System.out .println (Thread.currentThread().getName());
- 戻り値.getTime() + " | " + value.getOrgCode() ;
- }
- })
- .groupByKey(グループ化。 (Serdes.String()、jsonSerde)を使用)
- .count () 関数
- .toStream().print(Printed.toSysOut());
- ストリームを返します。
- }
実行結果: |