Spring Boot が Kafka を統合: spring-kafka の詳細な調査

Spring Boot が Kafka を統合: spring-kafka の詳細な調査

序文

Kafka は、トピック パーティションに基づいて設計され、非常に高いメッセージ送信および処理パフォーマンスを実現できるメッセージ キュー製品です。 Spring は、Apache の Kafka クライアントをカプセル化し、Spring プロジェクトに Kafka を迅速に統合するために使用される Spring-kafka プロジェクトを作成しました。単純なメッセージの送受信に加えて、Spring-kafka は多くの高度な機能も提供します。これらの用途を一つずつ見ていきましょう。

プロジェクトアドレス: https://github.com/spring-projects/spring-kafka

シンプルな統合

依存関係の導入

  1. <依存関係> <グループ ID > org.springframework.kafka </グループ ID > <アーティファクト ID > spring-kafka </アーティファクト ID > <バージョン> 2.2.6.RELEASE </バージョン> </依存関係>  

設定を追加

  1. spring.kafka.producer.bootstrap-servers = 127.0.0.1 :9092

送受信をテストする

  1. /**  
  2. * @著者: kl @kailing.pub  
  3. * @日付: 2019/5/30  
  4. */  
  5. @SpringBootアプリケーション 
  6. @レストコントローラ 
  7. パブリッククラスアプリケーション{  
  8. プライベート最終 Logger logger = LoggerFactory .getLogger(Application.class);  
  9. パブリック静的voidメイン(String[] args) {  
  10. SpringApplication.run(Application.class、引数);  
  11. }
  12. オートワイヤード 
  13. プライベート KafkaTemplate < Object , Object >テンプレート;  
  14. @GetMapping("/send/{input}")  
  15. public void sendFoo(@PathVariable String input) {  
  16. this.template.send("topic_input", 入力);  
  17. }  
  18. @KafkaListener( id = "webGroup" トピック= "topic_input" )  
  19. パブリック void listen(文字列入力) {  
  20. logger.info("入力値: {}" , input);  
  21. }  
  22. }

アプリケーションを起動したら、ブラウザに http://localhost:8080/send/kl と入力します。コンソールにログ出力が表示されます: 入力値: "kl"。基本的な使い方はとても簡単です。メッセージを送信するときに KafkaTemplate を挿入し、メッセージを受信するときに @KafkaListener アノテーションを追加します。

Spring-kafka-test組み込み Kafka サーバー

ただし、Kafka Server サービス環境がすでにある場合は、上記のコードを正常に起動できます。 Kafka は Scala + Zookeeper で構築されており、公式サイトからデプロイメント パッケージをダウンロードしてローカルにデプロイできることがわかっています。ただし、開発プロセスを簡素化し、Kafka 関連の機能を検証するために、Spring-Kafka-Test は Kafka-test をカプセル化し、アノテーション スタイルのワンクリックで Kafka Server 機能を起動できるようにしており、これも非常に使いやすいことをお伝えしたいと思います。この記事のすべての Kafka テスト ケースは、この組み込みサービスを使用して提供されます。

依存関係の導入

  1. <依存関係> <グループ ID > org.springframework.kafka </グループ ID > <アーティファクト ID > spring-kafka-test </アーティファクト ID > <バージョン> 2.2.6.RELEASE </バージョン> <スコープ>テスト</スコープ> </依存関係>  

サービスを開始する

次の Junit テスト ケースは、4 つのブローカー ノードを含む Kafka サーバー サービスを直接起動するために使用されます。

  1. @RunWith(SpringRunner.class)@SpringBootTest( classes = ApplicationTests .class)@EmbeddedKafka( count = 4 ports = {9092,9093,9094,9095})public クラス ApplicationTests {@Testpublic void contextLoads()throws IOException { System.in.read(); }}

上記のように、@EmbeddedKafka というアノテーションを 1 つ付けるだけで、完全に機能する Kafka サービスを開始できます。かっこいいでしょう?デフォルトでは、パラメータなしでアノテーションのみが書き込まれると、ランダムなポートを持つブローカーが作成されます。特定のポートといくつかのデフォルトの構成項目が起動ログに出力されます。ただし、Kafka インストール パッケージ構成ファイル内のこれらの構成項目は、アノテーション パラメータで構成できます。以下は、@EmbeddedKafka アノテーションで設定可能なパラメータの詳細な説明です。

  • 値: ブローカーノードの数
  • カウント: 値と同じ、ブローカーに設定されているノードの数
  • 制御されたシャットダウン スイッチ。主に、ブローカーが予期せずシャットダウンした場合に、このブローカーのパーティションが利用できなくなる時間を短縮するために使用されます。

Kafka は、マルチブローカー アーキテクチャを備えた高可用性サービスです。 1 つのトピックは複数のパーティションに対応し、1 つのパーティションには複数のレプリカを含めることができます。これらのレプリケーション コピーは、高可用性を実現するために複数のブローカーに保存されます。ただし、パーティション レプリカ セットは複数ありますが、現在動作しているレプリカ セットは 1 つだけです。デフォルトでは、最初に割り当てられたレプリカ セット [優先レプリカ] がリーダーとなり、データの書き込みと読み取りを担当します。ブローカーをアップグレードしたり、ブローカーの構成を更新したりする場合は、サービスを再起動する必要があります。この時点で、パーティションを利用可能なブローカーに転送する必要があります。以下に3つの状況が関係している。

  1. ブローカーを直接シャットダウンする: ブローカーがシャットダウンされると、ブローカー クラスターは新しいブローカーをパーティション リーダーとして再選出します。選挙中、このブローカーのパーティションはしばらくの間利用できなくなります。
  2. 制御されたシャットダウンを有効にする: ブローカーがシャットダウンされると、ブローカー自体は最初にリーダーの役割を他の利用可能なブローカーに転送しようとします。
  3. コマンドラインツールを使用する: bin/kafka-preferred-replica-election.sh を使用して、PartitionLeader ロールの転送を手動でトリガーします。
  • ports: ポート リスト (配列)。カウントパラメータに対応して、複数のポート番号に対応する複数のブローカーが存在する。
  • brokerProperties: ブローカー パラメータ設定。配列構造であり、ブローカー パラメータ設定に対して次のメソッドをサポートします。
  1. @EmbeddedKafka(ブローカープロパティ= {" log.index.interval.bytes = 4096"," num .io.threads = 8 "})
  • kerPropertiesLocation: ブローカーパラメータファイル設定

機能は上記の brokerProperties と同じですが、Kafka Broker には 182 個の構成可能なパラメーターがあります。上記のようにすべてを構成することは決して最善の解決策ではないため、次のようなローカル構成ファイルを読み込む機能が提供されています。

  1. @EmbeddedKafka(ブローカープロパティの場所= "classpath:application.properties" )

デフォルトでは、KafkaTemplate を使用してメッセージを送信するときにトピックが存在しない場合は、新しいトピックが作成されます。パーティションとレプリカのデフォルトの数は、次のブローカー パラメータによって設定されます。

新しいトピックを作成する

  1. num.partitions = 1 #トピックパーティションのデフォルト数 
  2. num.replica.fetchers = 1 #レプリカのデフォルト数

プログラムの開始時にトピックを作成する

  1. /**  
  2. * @著者: kl @kailing.pub  
  3. * @日付: 2019/5/31  
  4. */  
  5. @構成 
  6. パブリッククラスKafkaConfig{  
  7. @ビーン 
  8. パブリックKafkaAdmin管理者(KafkaPropertiesプロパティ){  
  9. KafkaAdmin管理者=新しいKafkaAdmin(properties.buildAdminProperties());  
  10. admin.setFatalIfBrokerNotAvailable(true);  
  11. 管理者に戻ります。  
  12. }  
  13. @ビーン 
  14. パブリック NewTopic topic2() {  
  15. 新しい NewTopic("topic-kl", 1, (short) 1) を返します。  
  16. }  
  17. }

Kafka Broker がサポートしている場合 (バージョン 1.0.0 以上)、既存のトピックのパーティション数が設定されたパーティション数より少ないことが判明した場合、新しいパーティションが追加されます。 KafkaAdmin の一般的な用途は、次のとおりです。

setFatalIfBrokerNotAvailable(true): デフォルト値は False です。これは、Broker が利用できない場合の Spring コンテキストの初期化には影響しません。ブローカーが利用できないことが通常のビジネス ニーズに影響すると思われる場合は、この値を True に設定します。

setAutoCreate(false) : デフォルト値は True です。これは、インスタンス化後に Kafka がインスタンス化された NewTopic オブジェクトを自動的に作成することを意味します。

初期化(): setAutoCreateがfalseの場合、プログラムは管理者の初期化()メソッドを明示的に呼び出してNewTopicオブジェクトを初期化する必要があります。

コードロジックで作成

プログラムを起動するときにトピックに必要なパーティションの数がわからないことがありますが、ブローカーのデフォルト設定をそのまま使用することはできません。この場合、Kafka-Client に付属する AdminClient を使用して処理する必要があります。上記の Spring によってカプセル化された KafkaAdmin も AdminClient を使用して処理されます。のように:

  1. オートワイヤード 
  2. プライベート KafkaProperties プロパティ。  
  3. @テスト 
  4. パブリック void testCreateToipc(){  
  5. AdminClientクライアント= AdminClient .create(properties.buildAdminProperties());  
  6. if(クライアント != null){  
  7. 試す {  
  8. コレクション< NewTopic >  新しいnewTopics = 新しい ArrayList < > (1);  
  9. 新しいトピックを追加します(新しい NewTopic("topic-kl",1,(short) 1));  
  10. クライアント。トピックを作成します(新しいトピック)。  
  11. }キャッチ (Throwable e){  
  12. e.printStackTrace();  
  13. }ついに {  
  14. クライアントを閉じます。  
  15. }  
  16. }  
  17. }

追伸: トピックを作成する他の方法

上記のトピック作成方法では、spring-kafka2.x は spring boot2.x のみをサポートしているため、Spring Boot のバージョンが 2.x 以上である必要があります。これらの API はバージョン 1.x では使用できません。プログラム内でKafka_2.10を介してトピックを作成する方法は次のとおりです。

依存関係の導入

  1. <依存関係>    
  2. <グループID> org.apache.kafka</グループID>    
  3. <アーティファクトID> kafka_2.10</アーティファクトID >    
  4. <バージョン> 0.8.2.2</バージョン>    
  5. </依存関係>  

APIで作成

  1. @テスト 
  2. パブリック void testCreateTopic() は例外をスローします{  
  3. ZkClient zkClient =新しいZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$)  
  4. 文字列topicName = "topic-kl" ;  
  5. intパーティション= 1 ;  
  6. 整数レプリケーション= 1 ;  
  7. AdminUtils.createTopic(zkClient、トピック名、パーティション、レプリケーション、新しいプロパティ());  
  8. }

ZkClient の最後の構築パラメータは、シリアル化と逆シリアル化のインターフェース実装であることに注意してください。ブロガーがテスト中に記入しないと、ZK で作成されたトピックのデータに問題が生じます。デフォルトの Kafka 実装も非常にシンプルで、文字列の UTF-8 エンコード処理を実行します。 ZKStringSerializer$ は、Kafka に実装されたインターフェース インスタンスです。 Scala のコンパニオン オブジェクトです。 Java で MODULE$ を直接呼び出すことでインスタンスを取得できます。

コマンドモードの作成

  1. @テスト 
  2. パブリック void testCreateTopic(){  
  3. 文字列[]オプション=新しい文字列[]{  
  4. " - 作成する"、  
  5. "--zookeeper","127.0.0.1:2181",  
  6. "--レプリケーション係数", "3",  
  7. "--パーティション", "3",  
  8. "--トピック"、"トピック-kl"  
  9. };  
  10. TopicCommand.main(オプション);  
  11. }

メッセージ送信のための KafkaTemplate の調査

送信結果を取得する

非同期取得

  1. template.send("","").addCallback(新しいListenableFutureCallback < SendResult < Object , Object >> ( ) {  
  2. @オーバーライド 
  3. パブリックvoid onFailure(Throwable throwable) {  
  4. ......  
  5. }  
  6. @オーバーライド 
  7. パブリックvoid onSuccess(SendResult < Object 、 Object > objectObjectSendResult) {  
  8. ....  
  9. }  
  10. });

同期取得

  1. ListenableFuture < SendResult <オブジェクトオブジェクト>>   future =テンプレート.send("topic-kl","kl");  
  2. 試す {  
  3. SendResult <オブジェクト、オブジェクト>  結果= future .get();  
  4. }キャッチ (Throwable e){  
  5. e.printStackTrace();  
  6. }

Kafka トランザクション メッセージ

デフォルトでは、Spring-kafka によって自動的に生成される KafkaTemplate インスタンスには、トランザクション メッセージを送信する機能がありません。トランザクション機能を有効にするには、次の設定が必要です。トランザクションがアクティブ化された後、すべてのメッセージ送信はトランザクションが発生したメソッド内でのみ実行できます。それ以外の場合は、トランザクションがないことを示す例外がスローされます。

  1. spring.kafka.producer.transaction-id-prefix = kafka_tx

たとえば、メッセージの送信にトランザクションが必要な場合、次の例のように、すべてのメッセージが正常に送信されます。最初のメッセージが送信された後、2 番目のメッセージが送信される前に例外が発生すると、送信された最初のメッセージもロールバックされます。さらに、通常の状況では、最初のメッセージが送信され、その後一定期間スリープしてから 2 番目のメッセージを送信すると仮定すると、コンシューマーはトランザクション メソッドが実行された後にのみメッセージを受信します。

  1. @GetMapping("/send/{input}")  
  2. public void sendFoo(@PathVariable String input) {  
  3. テンプレート.executeInTransaction(t - > {  
  4. t.send("topic_input","kl");  
  5. if ("エラー".equals(入力)) {  
  6. 新しい RuntimeException("失敗") をスローします。  
  7. }  
  8. t.send("topic_input","ckl");  
  9. true を返します。  
  10. });  
  11. }

トランザクション機能が有効化されると、メソッドに@Transactionalアノテーションを追加することでも有効になります。

  1. @GetMapping("/send/{input}")  
  2. @Transactional(ロールバックFor = RuntimeException .class)  
  3. public void sendFoo(@PathVariable String input) {
  4.   テンプレートを送信します("topic_input", "kl");  
  5. if ("エラー".equals(入力)) {  
  6. 新しい RuntimeException("失敗") をスローします。  
  7. }  
  8. テンプレートを送信します("topic_input", "ckl");  
  9. }

Spring-Kafka のトランザクション メッセージングは​​、Kafka が提供するトランザクション メッセージング機能に基づいています。 Kafka ブローカーのデフォルト構成は、3 つ以上のブローカーの高可用性サービス用に設定されています。テスト中のシンプルさと利便性のために、組み込みサービスを使用して単一ブローカーの Kafka サービスを作成しましたが、いくつかの問題が発生しました。

1. トランザクション ログ レプリカ セットがブローカーの数より大きい場合、次の例外がスローされます。

  1. 有効なブローカーの数 '1' は、必要なレプリケーション係数 '3' を満たしていません 
  2. トランザクション状態トピック用 ('transaction.state.log.replication.factor' で構成)。  
  3. クラスターが起動中で、すべてのブローカーがまだ起動していない場合は、このエラーは無視できます。

デフォルトのブローカー構成transaction.state.log.replication.factor=3、単一ノードは1にしか調整できません

2. レプリカの数がレプリカ同期キューの数より少ない場合、次の例外がスローされます。

  1. パーティション__transaction_state-13の同期レプリカの数は[1]で、必要な最小数[2]を下回っています。

デフォルトのブローカー構成はtransaction.state.log.min.isr=2で、単一ノードは1にしか調整できません。

ReplyKafkaTemplate はメッセージの返信を取得します

ReplyKafkaTemplate は KafkaTemplate のサブクラスです。親クラスのメソッドを継承するだけでなく、メッセージの送信\返信セマンティクスを実装するための新しいメソッドsendAndReceiveを追加します。

  1. RequestReplyFuture < K , V, R > sendAndReceive(ProducerRecord < K , V >レコード);

つまり、メッセージを送信すると、コンシューマーから結果が返されます。従来の RPC 対話と同じです。メッセージの送信者がメッセージ コンシューマーの特定の消費状況を知る必要がある場合、この API は非常に適しています。たとえば、メッセージでデータのバッチを送信する場合、コンシューマーが正常に処理したデータを知る必要があります。次のコードは、ReplyingKafkaTemplateを統合して使用する方法を示しています。

  1. /**  
  2. * @著者: kl @kailing.pub  
  3. * @日付: 2019/5/30  
  4. */  
  5. @SpringBootアプリケーション 
  6. @レストコントローラ
  7.  パブリッククラスアプリケーション{  
  8. プライベート最終 Logger logger = LoggerFactory .getLogger(Application.class);  
  9. パブリック静的voidメイン(String[] args) {  
  10. SpringApplication.run(Application.class、引数);  
  11. }  
  12. @ビーン 
  13. パブリック ConcurrentMessageListenerContainer <文字列, 文字列>返信コンテナ (ConcurrentKafkaListenerContainerFactory <文字列, 文字列>コンテナファクトリー) {  
  14. ConcurrentMessageListenerContainer <文字列, 文字列>  返信コンテナ= containerFactory .createContainer("返信");  
  15. 返信コンテナ.getContainerProperties().setGroupId("返信グループ");  
  16. コンテナの自動起動を false に設定します。  
  17. repliesContainer を返します。  
  18. }  
  19. @ビーン 
  20. パブリック ReplyKafkaTemplate <文字列、文字列、文字列> replyingTemplate(ProducerFactory <文字列、文字列> pf、ConcurrentMessageListenerContainer <文字列、文字列> repliesContainer) {
  21.   新しい ReplyKafkaTemplate(pf, repliesContainer) を返します。  
  22. }  
  23. @ビーン 
  24. パブリック KafkaTemplate kafkaTemplate(ProducerFactory < String , String > pf) {  
  25. 新しい KafkaTemplate(pf) を返します。  
  26. }  
  27. オートワイヤード 
  28. プライベート ReplyKafkaTemplate テンプレート;  
  29. @GetMapping("/send/{input}")  
  30. @Transactional(ロールバックFor = RuntimeException .class)  
  31. パブリック void sendFoo(@PathVariable String input) は例外をスローします {  
  32. プロデューサーレコード<文字列, 文字列>  レコード=新しいProducerRecord < > ("topic-kl", 入力);  
  33. RequestReplyFuture <文字列、文字列、文字列>   replyFuture =テンプレート.sendAndReceive(レコード);  
  34. ConsumerRecord <文字列, 文字列>  消費者レコード= replyFuture .get();  
  35. System.err.println("戻り値: " + consumerRecord.value());  
  36. }  
  37. @KafkaListener( id = "webGroup" トピック= "topic-kl" )  
  38. @送信先 
  39. パブリック文字列 listen(文字列入力) {  
  40. logger.info("入力値: {}", input);  
  41. 「成功」を返します。  
  42. }  
  43. }

Spring-kafka メッセージ消費の使用法の調査

@KafkaListener の使用

@KafkaListener のメッセージ受信機能は、これまでの簡単な統合で実証されましたが、@KafkaListener にはそれ以上の機能があります。より多くの使用シナリオを持つその他の一般的な機能は次のとおりです。

  • 消費される指定されたトピックとパーティションのメッセージを表示します。
  • 各トピックとパーティションの初期化のオフセットを設定します。
  • コンシューマスレッドの同時実行を設定する
  • メッセージ例外ハンドラを設定する
  1. @KafkaListener( id = "webGroup" トピックパーティション= {  
  2. @TopicPartition(トピック= "topic1" パーティション= {"0", "1"})、  
  3. @TopicPartition(トピック= "topic2" パーティション= "0"  
  4. パーティションオフセット= @PartitionOffset(パーティション= "1" 初期オフセット= "100" ))  
  5. },同時実行性= "6" errorHandler = "myErrorHandler" )  
  6. パブリック文字列 listen(文字列入力) {  
  7. logger.info("入力値: {}", input);  
  8. 「成功」を返します。  
  9. }

その他の注釈パラメータは簡単に理解できます。 ErrorHandler について説明する必要があります。このパラメータを設定するには、KafkaListenerErrorHandler インターフェースを実装する必要があります。アノテーション内の構成は、Spring コンテキスト内のカスタム実装インスタンスの名前です。たとえば、上記の構成は errorHandler = "myErrorHandler" です。すると、春のローンチでは次のようなインスタンスが作成されるはずです。

  1. /**  
  2. * @著者: kl @kailing.pub  
  3. * @日付: 2019/5/31  
  4. */  
  5. @Service("myErrorHandler")  
  6. パブリッククラス MyKafkaListenerErrorHandler は KafkaListenerErrorHandler を実装します {  
  7. ロガーlogger = LoggerFactory .getLogger(getClass());
  8.   @オーバーライド 
  9. パブリックオブジェクトhandleError(Message <? >メッセージ、ListenerExecutionFailedException 例外) {  
  10. logger.info(message.getPayload().toString());  
  11. null を返します。  
  12. }  
  13. @オーバーライド 
  14. パブリック オブジェクト handleError(Message <? >メッセージ、ListenerExecutionFailedException 例外、Consumer <? ?>コンシューマー) {  
  15. logger.info(message.getPayload().toString());  
  16. null を返します。  
  17. }  
  18. }

手動確認モード

手動 ACK モードでは、コミット オフセットはビジネス ロジックによって制御されます。たとえば、プログラムが消費しているとき、このセマンティクスがあり、特に異常な状況では、ACK を確認せず、つまりオフセットを送信しません。その場合は、手動の ACK モードを使用してのみ実行できます。手動送信を有効にするには、まず自動送信をオフにしてから、コンシューマーの消費モードを設定する必要があります。

  1. spring.kafka.consumer.enable-auto-commit = false    
  2. spring.kafka.listener.ack-mode =手動 

上記の設定が完了したら、使用時に @KafkaListener リスニング メソッドで Acknowledgment を入力するだけです。 ack.acknowledge() を実行すると、オフセットが送信されたことを意味します。

  1. @KafkaListener( id = "webGroup" トピック= "topic-kl" )  
  2. パブリック文字列 listen(文字列入力、確認応答 ack) {  
  3. logger.info("入力値: {}", input);  
  4. if ("kl".equals(入力)) {  
  5. ack.acknowledge();  
  6. }  
  7. 「成功」を返します。  
  8. }

@KafkaListener アノテーション リスナー ライフサイクル

@KafkaListener アノテーションが付けられたリスナーのライフサイクルを制御できます。デフォルトでは、@KafkaListener のパラメーター autoStartup = "true" です。つまり、消費は自動的に開始されますが、そのライフサイクルは KafkaListenerEndpointRegistry を通じて介入することもできます。 KafkaListenerEndpointRegistry には、start()、pause()、resume()/start、stop、continue の 3 つのアクション メソッドがあります。次のコードはこの機能を詳細に示しています。

  1. /**  
  2. * @著者: kl @kailing.pub  
  3. * @日付: 2019/5/30  
  4. */  
  5. @SpringBootアプリケーション 
  6. @レストコントローラ 
  7. パブリッククラスアプリケーション{  
  8. プライベート最終 Logger logger = LoggerFactory .getLogger(Application.class);  
  9. パブリック静的voidメイン(String[] args) {  
  10. SpringApplication.run(Application.class、引数);  
  11. }  
  12. オートワイヤード 
  13. プライベート KafkaTemplate テンプレート。  
  14. @GetMapping("/send/{input}")  
  15. @Transactional(ロールバックFor = RuntimeException .class)  
  16. パブリック void sendFoo(@PathVariable String input) は例外をスローします {  
  17. プロデューサーレコード<文字列, 文字列>  レコード=新しいProducerRecord < > ("topic-kl", 入力);  
  18. テンプレート.send(レコード);  
  19. }  
  20. オートワイヤード 
  21. プライベート KafkaListenerEndpointRegistry レジストリ。  
  22. @GetMapping("/stop/{リスナーID}")  
  23. public void stop(@PathVariable String リスナーID){  
  24. レジストリの getListenerContainer(リスナーID).pause();  
  25. }  
  26. @GetMapping("/resume/{リスナーID}")  
  27. public voidresume(@PathVariable String リスナーID){  
  28. レジストリの getListenerContainer(リスナーID).resume();  
  29. }  
  30. @GetMapping("/start/{リスナーID}")  
  31. public void start(@PathVariable String リスナーID){  
  32. レジストリの getListenerContainer(リスナーID).start();  
  33. }  
  34. @KafkaListener( id = "webGroup" トピックス= "topic-kl" autoStartup = "false" )  
  35. パブリック文字列 listen(文字列入力) {  
  36. logger.info("入力値: {}", input);  
  37. 「成功」を返します。  
  38. }  
  39. }

上記のコードでは、listenerID は @KafkaListener の ID 値「webGroup」です。プロジェクトを開始したら、次の URL をそれぞれ実行して効果を確認します。

まず、メッセージを送信します: http://localhost:8081/send/ckl。 autoStartup = "false" なので、リスナーに入るメッセージは表示されません。

次に、リスナーを起動します: http://localhost:8081/start/webGroup。メッセージが届いたことがわかります。

同様の方法を使用して、消費を一時停止したり継続したりした場合の効果をテストできます。

SendToメッセージ転送

前回のメッセージ送信応答アプリケーションで @SendTo を見ました。 @SendTo アノテーションは、応答セマンティクスの送信に加えて、転送トピック キューを指定するためのパラメータも受け取ることができます。一般的なシナリオとしては、メッセージを複数回処理する必要があり、異なる処理によって cups などの一貫性のないリソースが消費されるというものがあります。これは、異なるトピックにまたがり、異なるホストにデプロイされたコンシューマーを使用することで解決できます。のように:

  1. @KafkaListener( id = "webGroup" トピック= "topic-kl" )  
  2. @SendTo("トピック-ckl")  
  3. パブリック文字列 listen(文字列入力) {  
  4. logger.info("入力値: {}", input);  
  5. 入力 + "hello!" を返します。  
  6. }  
  7. @KafkaListener( id = "webGroup1" トピック= "topic-ckl" )  
  8. パブリック void listen2(文字列入力) {  
  9. logger.info("入力値: {}", input);  
  10. }

メッセージ再試行とデッドレターキューの適用

上記の手動 Ack モードを通じてメッセージのオフセットを制御することに加えて、Spring-kafka は再試行可能な消費メッセージのセマンティクスもカプセル化します。つまり、消費データで例外が発生した場合にメッセージを再試行するように設定できます。事前に決められたトピックにメッセージを送信するまでの再試行回数も設定できます。それがデッドレターキューです。次のコードはこの効果を示しています。

  1. オートワイヤード 
  2. プライベート KafkaTemplate テンプレート。  
  3. @ビーン 
  4. public ConcurrentKafkaListenerContainerFactory <? ?> kafkaListenerContainerFactory(  
  5. ConcurrentKafkaListenerContainerFactoryConfigurer コンフィギュラー、  
  6. ConsumerFactory <オブジェクト, オブジェクト> kafkaConsumerFactory、  
  7. KafkaTemplate <オブジェクト、オブジェクト>テンプレート) {  
  8. ConcurrentKafkaListenerContainerFactory <オブジェクト, オブジェクト>  ファクトリー=新しいConcurrentKafkaListenerContainerFactory < > ();  
  9. configurer.configure(ファクトリ、kafkaConsumerFactory);  
  10. //最大再試行回数: 3 回 
  11. factory.setErrorHandler(新しい SeekToCurrentErrorHandler(新しい DeadLetterPublishingRecoverer(テンプレート)、3));  
  12. 工場を返却する。  
  13. }  
  14. @GetMapping("/send/{input}")  
  15. public void sendFoo(@PathVariable String input) {  
  16. template.send("topic-kl", 入力);  
  17. }  
  18. @KafkaListener( id = "webGroup" トピック= "topic-kl" )  
  19. パブリック文字列 listen(文字列入力) {  
  20. logger.info("入力値: {}", input);  
  21. 新しい RuntimeException("dlt") をスローします。  
  22. }  
  23. @KafkaListener( id = "dltGroup" トピック= "topic-kl.DLT" )  
  24. パブリック void dltListen(文字列入力) {  
  25. logger.info("DLTから受信: " + 入力);  
  26. }

上記のアプリケーションでは、メッセージが topic-kl でリッスンされると、ランタイム例外がトリガーされ、再試行の最大回数に達した後、リスナーは 3 回呼び出しを試みます。メッセージは破棄され、デッドレター キューに再試行されます。デッドレターキューのトピックルールは、ビジネストピック名 + 「.DLT」です。上記のビジネス トピックの名前が「topic-kl」の場合、対応するデッドレター キュー トピックは「topic-kl.DLT」になります。

結論

最近、Kafka と Spring-kafka がビジネスで使われるようになったので、Spring-kafka のさまざまな用途を体系的に調査したところ、埋め込み Kafka サービスを有効にするアノテーション、RPC 呼び出しのような send\response セマンティック呼び出し、トランザクション メッセージ、その他の機能など、多くの興味深くクールな機能を発見しました。このブログ投稿が、Spring-kafka を使用している方、または使用しようとしている方の迂回や落とし穴を回避するのに役立つことを願っています。

<<:  AWS DeepComposer: 生成機械学習モデルによる音楽の作成

>>:  クラウドでの ERP 運用に関する 8 つの誤解に惑わされないでください

推薦する

Xiake Station Group System V3の無料版の経験とWORDPRESSプラットフォームの実践的な経験

最近、サイトグループで遊びたくなって、朝A5を開いて、Xiakeサイトグループシステムの無料版がある...

2023年に注目すべきクラウドコンピューティングの3つのトレンド

企業がコストを抑制しながら近代化を進めるにつれ、競合する物語が収束しつつあります。クラウド コンピュ...

alpha1server: 35% オフ、100M 無制限トラフィック KVM シリーズ、$3.5/1g メモリ/20g SSD

alpha1server(このサイトでは2017年に初めて紹介されました。こちらをクリック)では、現...

A5トピック: 中小電子商取引企業は発展、解雇、閉鎖の困難に直面している

2012年上半期には、多くの大手電子商取引企業が「価格戦争」に参入し、中小電子商取引企業の市場シェア...

話し合うこと: ウェブサイトが含まれていない状況

なぜウェブサイトが含まれないのか、なぜ含まれるのがこんなに遅いのか、なぜ含まれるのがこんなに少ないの...

Prometheus が NAT 経由でメトリックをスクレイピングできるようにするツール - PushProx

Prometheus は Pull モードを使用して監視インジケーターをプルすることがわかっています...

知乎の新しいコンテンツ基準の分析

Zhihuであれ、ショート動画プラットフォームのDouyinやKuaishouであれ、すべてのコンテ...

SEOの受注には多くの知識が必要であり、私たちは「控えめ」である必要があります

みなさんこんにちは。私はChen Nianです。今日は、SEO の注文を受ける際に遭遇したいくつかの...

デルテクノロジーズ、AIとエッジコンピューティング向けの次世代PowerEdgeサーバーを発表

Dell Technologies (NYSE: DELL) は本日、より強力で安全な新世代の De...

インターネットビッグディール時代の幻想:その後の統合が問題となる

昨年からテクノロジー企業間の合併や買収投資が増加しており、合併や買収の対象も中小企業から大企業へと移...

クラウド時代のインターネットジャンル

01シリコンバレーのインターネットの「先進的なアイデア」は、かつての主導的地位を失いつつある。 Pa...

マーケティング事例: Jiuxian.com を試してみたところ、良いと感じました。

この記事はワイン愛好家の観点から書かれたもので、Jiuxian.com を宣伝するものではありません...

SEOチュートリアル: 無視できないウェブサイト内部構造の最適化

多くの人は SEO を行う際に混乱し、ウェブサイトのデータ、トラフィックが変化したかどうか、キーワー...

企業のウェブサイトを最適化するにはどうすればいいでしょうか?

インターネットの急速な成長と発展に伴い、ますます多くの伝統的な企業が徐々にインターネット分野に移行し...