SpringBootは分散メッセージングプラットフォームPulsarを統合

SpringBootは分散メッセージングプラットフォームPulsarを統合

みなさんこんにちは。ジュン兄です。

優れたメッセージストリーミングプラットフォームとして、Pulsar はますます使用されています。この記事では、Pulsar の Java クライアントについて説明します。

パルサーを展開する

Pulsar をデプロイするには、ローカル バイナリ インストール、Docker デプロイ、Kubernetes へのデプロイという 3 つの主な方法があります。

この記事では、docker を使用して単一ノードの Pulsar クラスターをデプロイします。実験環境は2コアCPU、4Gメモリです。

デプロイメントコマンドは次のとおりです。

  1. docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar スタンドアロン 

インストール プロセス中に次のエラーが発生する可能性があります。

  1. 不明なフラグ: --mount  
  2. 「docker run --help」を参照してください

これは、docker のバージョンが低く、マウント パラメータをサポートしていないためです。 docker バージョンを 17.06 以上にアップグレードするだけです。

ネットワーク上の理由により、展開プロセスが失敗する可能性がありますが、数回試行すると成功する可能性があります。次のログが表示されれば、起動は成功したことになります。

  1. 2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - メッセージング サービスは準備完了、ブートストラップ サービス ポート = 8080、ブローカー URL = pulsar://localhost:6650、クラスター = スタンドアロン

ローカルのシングルノードクラスタが起動すると、public/defaultという名前空間が作成されます。

パルサークライアント

現在、Pulsar は以下を含む複数の言語でクライアントをサポートしています。

Java クライアント Go クライアント Python クライアント C++ クライアント Node.js クライアント WebSocket クライアント C# クライアント

SpringBoot 構成

SpringBoot を使用して Pulsar クライアントを統合します。まず、Pulsar クライアントの依存関係を導入します。コードは次のとおりです。

  1. <依存関係>
  2. <グループ ID>org.apache.pulsar</グループ ID>
  3. <artifactId>パルサークライアント</artifactId>
  4. <バージョン>2.9.1</バージョン>
  5. </依存関係>

次に、プロパティ ファイルに構成を追加します。

  1. # パルサーアドレス
  2. pulsar.url=pulsar://192.168.59.155:6650
  3. # トピック
  4. pulsar.topic=テストトピック
  5. # 消費者グループ 
  6. pulsar.subscription=トピックグループ

クライアントの作成

クライアントの作成は非常に簡単で、コードは次のとおりです。

  1. クライアント = PulsarClient.builder()
  2. .serviceUrl(URL) は、
  3. 。建てる();

上記の URL は、プロパティ ファイルで定義されている pulsar.url です。

クライアントを作成するときに、クラスターが正常に起動されなかったとしても、実際にはまだクラスターに接続されていないため、プログラムはエラーを報告しません。

プロデューサーを作成

  1. プロデューサー = client.newProducer()
  2. .topic(トピック)
  3. .compressionType(圧縮タイプ.LZ4)
  4. .sendTimeout(0, 時間単位.SECONDS)
  5. .enableBatching( true )
  6. .batchingMaxPublishDelay(10, 時間単位.ミリ秒)
  7. .バッチ処理最大メッセージ数(1000)
  8. .maxPendingMessages(1000)
  9. .blockIfQueueFull( true )
  10. .roundRobinRouterBatchingPartitionSwitchFrequency(10)
  11. .batcherBuilder( BatcherBuilder.DEFAULT )
  12. 。作成する();

プロデューサーを作成すると、実際にクラスターに接続します。クラスターに問題がある場合は、接続エラーが報告されます。

プロデューサーを作成するためのパラメータについて次に説明します。

トピック: プロデューサーが書きたいトピック。

compressionType: 圧縮戦略。現在、4 つの戦略がサポートされています (NONE、LZ4、ZLIB、ZSTD)。 Pulsar 2.3 以降では、この戦略はコンシューマー バージョンが 2.3 以上の場合にのみ有効になります。

sendTimeout: タイムアウト期間。プロデューサーがタイムアウト期間内に ACK を受信しない場合、メッセージを再送信します。

enableBatching: メッセージのバッチ処理を有効にするかどうか。デフォルト値は true です。このパラメータは、非同期送信 (sendAsync) の場合にのみ有効です。同期送信を選択した場合は無効になります。

batchingMaxPublishDelay: メッセージをバッチで送信する期間。ここでは10msと定義します。バッチ処理時間を設定すると、メッセージの数によって影響を受けなくなることに注意してください。バッチ送信では、送信するバッチ メッセージを 1 つのネットワーク パケットに入れて送信するため、ネットワーク IO 回数が削減され、ネットワーク カードの送信効率が大幅に向上します。

batchingMaxMessages: バッチで送信されるメッセージの最大数。

maxPendingMessages: ブローカーからの ACK の受信を待機しているメッセージ キューの最大長。キューがいっぱいの場合、blockIfQueueFull 値が true に設定されていない限り、プロデューサーのすべての sendAsync および send 呼び出しは失敗します。

blockIfQueueFull: プロデューサーがメッセージを送信すると、まずそのメッセージがローカル キュー キャッシュに格納されます。キャッシュがいっぱいになると、メッセージの送信がブロックされます。

roundRobinRouterBatchingPartition-SwitchFrequency: メッセージを送信するときにキーが指定されていない場合、デフォルトでラウンドロビン方式を使用してメッセージが送信されます。ラウンドロビン方式を使用する場合、パーティション切り替え周期は (頻度 * batchingMaxPublishDelay) になります。

消費者の創造

Pulsar の消費モデルは次のとおりです。

図からわかるように、コンシューマーは消費する前にサブスクリプションをバインドする必要があります。

  1. Consumer = client.newConsumer()
  2. .topic(トピック)
  3. .subscriptionName(サブスクリプション)
  4. .subscriptionType(サブスクリプションタイプ.Shared)
  5. .subscriptionInitialPosition(サブスクリプションの初期位置.Earliest)
  6. .negativeAck再配信遅延(60, TimeUnit.SECONDS)
  7. .レシーバーキューサイズ(1000)
  8. 。購読する();

以下では、コンシューマーを作成するためのパラメータについて説明します。

トピック: コンシューマーがサブスクライブするトピック。

subscriptionName: コンシューマーが関連付けるサブスクリプションの名前。

subscriptionType: サブスクリプションの種類。 Pulsar は次の 4 種類のサブスクリプションをサポートしています。

排他: 排他モード。同じトピックに対してコンシューマーは 1 つしか存在できません。コンシューマーが複数存在する場合、エラーが発生します。フェイルオーバー: 災害復旧モード。同じトピックには複数のコンシューマーが存在する可能性がありますが、消費できるのは 1 つのコンシューマーだけです。他のコンシューマーはフェイルオーバー バックアップとして機能します。現在のコンシューマーに障害が発生した場合、バックアップ コンシューマーの 1 つが選択されて消費されます。以下のように表示されます。

共有: 共有モードでは、同じトピックを複数のコンシューマーがサブスクライブして使用できます。メッセージはラウンドロビン ポーリング メカニズムを通じてさまざまなコンシューマーに配信され、各メッセージは 1 つのコンシューマーにのみ配信されます。コンシューマーが切断された場合、そのコンシューマーに送信されたメッセージが消費されていない場合、これらのメッセージは他の存続しているコンシューマーに再配布されます。以下のように表示されます。

Key_Shared: メッセージとコンシューマーの両方がキーにバインドされ、メッセージは同じキーにバインドされたコンシューマーにのみ送信されます。新しいコンシューマーが接続を確立するか、コンシューマーが切断された場合、一部のメッセージのキーを更新する必要があります。 Shared モードと比較すると、Key_Shared の利点は、同じキーのメッセージの順序を確保しながら、コンシューマーが同時にメッセージを消費できることです。以下のように表示されます。

subscriptionInitialPosition: 新しいサブスクリプションを作成するときに消費を開始する場所。次の 2 つのオプションがあります。

最新: 最新のメッセージから消費を開始します。最古: 最も古いメッセージから消費を開始します。

negativeAckRedeliveryDelay: 消費失敗後にブローカーが再送信する間隔。

受信キューサイズ: 受信メソッドが呼び出される前に蓄積できるメッセージの最大数。 0 に設定すると、ブローカーから一度にプルされるメッセージは 1 つだけになります。共有モードでは、バッチ メッセージが複数のコンシューマーに送信され、他のコンシューマーがアイドル状態になるのを防ぐために、receiverQueueSize は 0 に設定されます。

コンシューマーがメッセージを受信する方法は、同期単一、同期バッチ、非同期単一、非同期バッチの 4 つがあります。コードは次のとおりです。

  1. メッセージ message = consumer.receive()
  2. CompletableFuture<Message> メッセージ = consumer.receiveAsync();
  3. メッセージ message = consumer.batchReceive();
  4. CompletableFuture<Messages> メッセージ = consumer.batchReceiveAsync();

バッチ受信の場合は、バッチ受信戦略も設定できます。コードは次のとおりです。

  1. Consumer = client.newConsumer()
  2. .topic(トピック)
  3. .subscriptionName(サブスクリプション)
  4. .batchReceivePolicy(BatchReceivePolicy.builder()
  5. .最大メッセージ数(100)
  6. .maxNumBytes(1024 * 1024)
  7. .timeout(200, 時間単位.ミリ秒)
  8. 。建てる())
  9. 。購読する();

コード内のパラメータは次のように記述されます。

maxNumMessages: バッチで受信されるメッセージの最大数。 maxNumBytes: 受信したバッチメッセージのサイズ。ここでは 1MB です。

テスト

まず、Producer がメッセージを送信するためのコードを次のように記述します。

  1. パブリックvoid sendMsg(文字列キー、文字列データ) {
  2. CompletableFuture<MessageId> future = producer.newMessage()
  3. キー(キー)
  4. .value(data.getBytes()).sendAsync();
  5. future.handle((v, ex) -> {
  6. if (ex == null ) {
  7. logger.info( "メッセージは正常に送信されました、キー:{}、メッセージ:{}" キー、データ);
  8. }それ以外{
  9. logger.error( "メッセージの送信に失敗しました。キー:{}、メッセージ:{}" キー、データ);
  10. }
  11. 戻る ヌル;
  12. });
  13. 将来参加();
  14. logger.info( "メッセージの送信が完了しました、キー:{}、メッセージ:{}" キー、データ);
  15. }

次に、メッセージを消費するための Consumer コードを次のように記述します。

  1. パブリックvoid start() は例外をスローします{
  2. )の間{
  3. メッセージ message = consumer.receive();
  4. 文字列キー= message.getKey();
  5. 文字列データ = new String(message.getData());
  6. 文字列トピック = message.getTopicName();
  7. StringUtils.isNotEmpty(データ)の場合{
  8. 試す{
  9. logger.info( "受信メッセージ、トピック:{}、キー:{}、データ:{}" 、トピック、キー、データ);
  10. }catch(例外 e){
  11. logger.error( "メッセージ受信例外、トピック:{}、キー:{}、データ:{}" 、トピック、キー、データ、e);
  12. }
  13. }
  14. 消費者はメッセージを確認します。
  15. }
  16. }

最後に、Producer を呼び出してメッセージを送信する Controller クラスを記述します。コードは次のとおりです。

  1. @RequestMapping( "/送信" )
  2. @レスポンス本文
  3. パブリック文字列送信(@RequestParam 文字列キー、@RequestParam 文字列データ) {
  4. logger.info( "メッセージ送信要求を受信しました、キー:{}、値:{}" キー、データ);
  5. pulsarProducer.sendMsg(キー、データ );
  6. 戻る  "成功" ;
  7. }

プロデューサーを呼び出して、キー = key1、データ = data1 のメッセージを送信します。具体的な操作は、ブラウザに次の URL を入力して Enter キーを押すことです。

  1. http://192.168.157.1:8083/pulsar/send?キー=key1&データ =data1

コンソールに次のログ出力が表示されます。

  1. 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - メッセージを正常に送信しました。キー: key1、メッセージ: data1
  2. 2022-01-08 22:42:33,200 [http-nio-8083- exec -1] [INFO] boot.pulsar.PulsarProducer - メッセージの送信が完了しました。キー: key1、メッセージ: data1
  3. 2022-01-08 22:42:33,232 [スレッド-22] [INFO] boot.pulsar.PulsarConsumer - メッセージを受信しました。トピック:persistent:// public / default /testTopic、キー: key1、データ: data1
  4. 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] プリフェッチされたメッセージ: 0 --- 受信した消費スループット: 0.02 メッセージ/秒 --- 0.00 メガビット/秒 --- 確認応答送信レート: 0.02 確認応答/秒 --- 失敗したメッセージ: 0 --- バッチメッセージ: 0 --- 失敗した確認応答: 0  
  5. 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] 保留中のメッセージ: 0 --- パブリッシュ スループット: 0.02 メッセージ/秒 --- 0.00 メガビット/秒 --- レイテンシ: 中: 69.000 ミリ秒 - 95 パーセント: 69.000 ミリ秒 - 99 パーセント: 69.000 ミリ秒 - 99.9 パーセント: 69.000 ミリ秒 - 最大: 69.000 ミリ秒 --- 確認応答受信率: 0.02 確認応答/秒 --- 失敗したメッセージ: 0  

ログから、ここで使用されている名前空間は、クラスターの作成時に生成された public/default であることがわかります。

要約する

SpringBoot と Java クライアントの統合の観点から見ると、Pulsar の API は非常にフレンドリーで使いやすいです。 Consumer を使用する場合は、バッチ、非同期、サブスクリプション タイプなど、さらに考慮する必要があります。

<<:  企業情報化の集中構築は分散ユニットアーキテクチャに戻るべきである

>>:  マルチクラウドトランスポートネットワークシステムの構築方法

推薦する

hostsolutions: ルーマニア VPS、年間 25 ユーロ、KVM/1G メモリ/1 コア/30gNVMe/10T トラフィック/1Gbps 帯域幅

Hostsolutions は、ルーマニアの Oradea データセンターで KVM シリーズ VP...

キリバは多国籍企業の財務管理の「3つの柱」をサポートします

少し前、シルク・ドゥ・ソレイユが破産を申請したというニュースが、あらゆる階層から深い遺憾の意を引き起...

Android チャンネルで APP アプリケーションを有料プロモーションするための 3 つのヒント

1. 本物のユーザーを見つけたいなら、アプリストアにお金を払おう●利点:実際のユーザーが多く、信頼性...

外部リンクの開発履歴を振り返り、外部リンクの今後の開発方向を期待する

外部リンクは、ウェブサイトのキーワードランキングに影響を与える最も重要な要素の 1 つです。現在、ほ...

301 永続リダイレクトの実装と 302 リダイレクト

ウェブサイトの構築では、ウェブページのリダイレクトが必要な状況によく遭遇します。たとえば、ウェブペー...

海外のVPSレンタル業者が推奨する、あなた自身のVPSレンタル体験をまとめ、共有する

海外VPS(海外VPSレンタル、海外VPSレンタル)をレンタルする場合、(1)国内市場と比較した速度...

クラウド キャッシュ アプライアンスとゲートウェイを最大限に活用する方法

クラウド ストレージ キャッシュにより、パフォーマンスを低下させる遅延を削減できます。これらのデバイ...

ユーザーのニーズを満たすプロセスの重要なポイントを特定する: プロダクトマネージャーが必ず読むべき 9 つのステップの方法

私はこの記事を世界を変えることを夢見る人々に捧げますシュ・シュン何年も経って、あの若いプロダクトマネ...

クラウドコンピューティングの最適化におけるよくある省略

クラウド コンピューティングの最適化という概念は、企業がクラウド コンピューティングの価値を懸念して...

深夜、仮想マシンからプログラムが実行してしまいました...

[[386470]]画像はPexelsより罠に落ちる「次兄さん、やっと来たね。計画通りにやろう。ハー...

衝撃を受けた:田舎娘のSEOに対する理解

ウェブサイトの構造は SEO の基礎です。ウェブサイト内の最適化は、大きく分けて 2 つの部分に分け...

中国の化粧品トレンドに関する洞察

今年上半期が終わりに近づく中、中国の化粧品業界ではどんな新たなトレンドが生まれているのでしょうか? ...

2019年メーデー休暇旅行アプリ市場レポート!

先日終了したメーデーの休日中、目的地やアトラクションでは、おなじみの「群衆に従う」休日パターンが再び...

Baidu Sitelinkの立ち上げは、個々のウェブマスターにとっての今後の方向性を示している。

少し前に、Baidu はサイトリンク機能を正式にリリースしました。これにより、多くの有名な大規模ウェ...

Godaddy-エコノミーホスティング年間支払い 12 ドル (100G ハードドライブ)

godaddy からわずか 12 ドルで、PayPal、クレジットカード、Alipay をサポートす...