SpringBootは分散メッセージングプラットフォームPulsarを統合

SpringBootは分散メッセージングプラットフォームPulsarを統合

みなさんこんにちは。ジュン兄です。

優れたメッセージストリーミングプラットフォームとして、Pulsar はますます使用されています。この記事では、Pulsar の Java クライアントについて説明します。

パルサーを展開する

Pulsar をデプロイするには、ローカル バイナリ インストール、Docker デプロイ、Kubernetes へのデプロイという 3 つの主な方法があります。

この記事では、docker を使用して単一ノードの Pulsar クラスターをデプロイします。実験環境は2コアCPU、4Gメモリです。

デプロイメントコマンドは次のとおりです。

  1. docker run -it -p 6650:6650 -p 8080:8080 --mount source=pulsardata,target=/pulsar/data --mount source=pulsarconf,target=/pulsar/conf apachepulsar/pulsar:2.9.1 bin/pulsar スタンドアロン 

インストール プロセス中に次のエラーが発生する可能性があります。

  1. 不明なフラグ: --mount  
  2. 「docker run --help」を参照してください

これは、docker のバージョンが低く、マウント パラメータをサポートしていないためです。 docker バージョンを 17.06 以上にアップグレードするだけです。

ネットワーク上の理由により、展開プロセスが失敗する可能性がありますが、数回試行すると成功する可能性があります。次のログが表示されれば、起動は成功したことになります。

  1. 2022-01-08T22:27:58,726+0000 [main] INFO org.apache.pulsar.broker.PulsarService - メッセージング サービスは準備完了、ブートストラップ サービス ポート = 8080、ブローカー URL = pulsar://localhost:6650、クラスター = スタンドアロン

ローカルのシングルノードクラスタが起動すると、public/defaultという名前空間が作成されます。

パルサークライアント

現在、Pulsar は以下を含む複数の言語でクライアントをサポートしています。

Java クライアント Go クライアント Python クライアント C++ クライアント Node.js クライアント WebSocket クライアント C# クライアント

SpringBoot 構成

SpringBoot を使用して Pulsar クライアントを統合します。まず、Pulsar クライアントの依存関係を導入します。コードは次のとおりです。

  1. <依存関係>
  2. <グループ ID>org.apache.pulsar</グループ ID>
  3. <artifactId>パルサークライアント</artifactId>
  4. <バージョン>2.9.1</バージョン>
  5. </依存関係>

次に、プロパティ ファイルに構成を追加します。

  1. # パルサーアドレス
  2. pulsar.url=pulsar://192.168.59.155:6650
  3. # トピック
  4. pulsar.topic=テストトピック
  5. # 消費者グループ 
  6. pulsar.subscription=トピックグループ

クライアントの作成

クライアントの作成は非常に簡単で、コードは次のとおりです。

  1. クライアント = PulsarClient.builder()
  2. .serviceUrl(URL) は、
  3. 。建てる();

上記の URL は、プロパティ ファイルで定義されている pulsar.url です。

クライアントを作成するときに、クラスターが正常に起動されなかったとしても、実際にはまだクラスターに接続されていないため、プログラムはエラーを報告しません。

プロデューサーを作成

  1. プロデューサー = client.newProducer()
  2. .topic(トピック)
  3. .compressionType(圧縮タイプ.LZ4)
  4. .sendTimeout(0, 時間単位.SECONDS)
  5. .enableBatching( true )
  6. .batchingMaxPublishDelay(10, 時間単位.ミリ秒)
  7. .バッチ処理最大メッセージ数(1000)
  8. .maxPendingMessages(1000)
  9. .blockIfQueueFull( true )
  10. .roundRobinRouterBatchingPartitionSwitchFrequency(10)
  11. .batcherBuilder( BatcherBuilder.DEFAULT )
  12. 。作成する();

プロデューサーを作成すると、実際にクラスターに接続します。クラスターに問題がある場合は、接続エラーが報告されます。

プロデューサーを作成するためのパラメータについて次に説明します。

トピック: プロデューサーが書きたいトピック。

compressionType: 圧縮戦略。現在、4 つの戦略がサポートされています (NONE、LZ4、ZLIB、ZSTD)。 Pulsar 2.3 以降では、この戦略はコンシューマー バージョンが 2.3 以上の場合にのみ有効になります。

sendTimeout: タイムアウト期間。プロデューサーがタイムアウト期間内に ACK を受信しない場合、メッセージを再送信します。

enableBatching: メッセージのバッチ処理を有効にするかどうか。デフォルト値は true です。このパラメータは、非同期送信 (sendAsync) の場合にのみ有効です。同期送信を選択した場合は無効になります。

batchingMaxPublishDelay: メッセージをバッチで送信する期間。ここでは10msと定義します。バッチ処理時間を設定すると、メッセージの数によって影響を受けなくなることに注意してください。バッチ送信では、送信するバッチ メッセージを 1 つのネットワーク パケットに入れて送信するため、ネットワーク IO 回数が削減され、ネットワーク カードの送信効率が大幅に向上します。

batchingMaxMessages: バッチで送信されるメッセージの最大数。

maxPendingMessages: ブローカーからの ACK の受信を待機しているメッセージ キューの最大長。キューがいっぱいの場合、blockIfQueueFull 値が true に設定されていない限り、プロデューサーのすべての sendAsync および send 呼び出しは失敗します。

blockIfQueueFull: プロデューサーがメッセージを送信すると、まずそのメッセージがローカル キュー キャッシュに格納されます。キャッシュがいっぱいになると、メッセージの送信がブロックされます。

roundRobinRouterBatchingPartition-SwitchFrequency: メッセージを送信するときにキーが指定されていない場合、デフォルトでラウンドロビン方式を使用してメッセージが送信されます。ラウンドロビン方式を使用する場合、パーティション切り替え周期は (頻度 * batchingMaxPublishDelay) になります。

消費者の創造

Pulsar の消費モデルは次のとおりです。

図からわかるように、コンシューマーは消費する前にサブスクリプションをバインドする必要があります。

  1. Consumer = client.newConsumer()
  2. .topic(トピック)
  3. .subscriptionName(サブスクリプション)
  4. .subscriptionType(サブスクリプションタイプ.Shared)
  5. .subscriptionInitialPosition(サブスクリプションの初期位置.Earliest)
  6. .negativeAck再配信遅延(60, TimeUnit.SECONDS)
  7. .レシーバーキューサイズ(1000)
  8. 。購読する();

以下では、コンシューマーを作成するためのパラメータについて説明します。

トピック: コンシューマーがサブスクライブするトピック。

subscriptionName: コンシューマーが関連付けるサブスクリプションの名前。

subscriptionType: サブスクリプションの種類。 Pulsar は次の 4 種類のサブスクリプションをサポートしています。

排他: 排他モード。同じトピックに対してコンシューマーは 1 つしか存在できません。コンシューマーが複数存在する場合、エラーが発生します。フェイルオーバー: 災害復旧モード。同じトピックには複数のコンシューマーが存在する可能性がありますが、消費できるのは 1 つのコンシューマーだけです。他のコンシューマーはフェイルオーバー バックアップとして機能します。現在のコンシューマーに障害が発生した場合、バックアップ コンシューマーの 1 つが選択されて消費されます。以下のように表示されます。

共有: 共有モードでは、同じトピックを複数のコンシューマーがサブスクライブして使用できます。メッセージはラウンドロビン ポーリング メカニズムを通じてさまざまなコンシューマーに配信され、各メッセージは 1 つのコンシューマーにのみ配信されます。コンシューマーが切断された場合、そのコンシューマーに送信されたメッセージが消費されていない場合、これらのメッセージは他の存続しているコンシューマーに再配布されます。以下のように表示されます。

Key_Shared: メッセージとコンシューマーの両方がキーにバインドされ、メッセージは同じキーにバインドされたコンシューマーにのみ送信されます。新しいコンシューマーが接続を確立するか、コンシューマーが切断された場合、一部のメッセージのキーを更新する必要があります。 Shared モードと比較すると、Key_Shared の利点は、同じキーのメッセージの順序を確保しながら、コンシューマーが同時にメッセージを消費できることです。以下のように表示されます。

subscriptionInitialPosition: 新しいサブスクリプションを作成するときに消費を開始する場所。次の 2 つのオプションがあります。

最新: 最新のメッセージから消費を開始します。最古: 最も古いメッセージから消費を開始します。

negativeAckRedeliveryDelay: 消費失敗後にブローカーが再送信する間隔。

受信キューサイズ: 受信メソッドが呼び出される前に蓄積できるメッセージの最大数。 0 に設定すると、ブローカーから一度にプルされるメッセージは 1 つだけになります。共有モードでは、バッチ メッセージが複数のコンシューマーに送信され、他のコンシューマーがアイドル状態になるのを防ぐために、receiverQueueSize は 0 に設定されます。

コンシューマーがメッセージを受信する方法は、同期単一、同期バッチ、非同期単一、非同期バッチの 4 つがあります。コードは次のとおりです。

  1. メッセージ message = consumer.receive()
  2. CompletableFuture<Message> メッセージ = consumer.receiveAsync();
  3. メッセージ message = consumer.batchReceive();
  4. CompletableFuture<Messages> メッセージ = consumer.batchReceiveAsync();

バッチ受信の場合は、バッチ受信戦略も設定できます。コードは次のとおりです。

  1. Consumer = client.newConsumer()
  2. .topic(トピック)
  3. .subscriptionName(サブスクリプション)
  4. .batchReceivePolicy(BatchReceivePolicy.builder()
  5. .最大メッセージ数(100)
  6. .maxNumBytes(1024 * 1024)
  7. .timeout(200, 時間単位.ミリ秒)
  8. 。建てる())
  9. 。購読する();

コード内のパラメータは次のように記述されます。

maxNumMessages: バッチで受信されるメッセージの最大数。 maxNumBytes: 受信したバッチメッセージのサイズ。ここでは 1MB です。

テスト

まず、Producer がメッセージを送信するためのコードを次のように記述します。

  1. パブリックvoid sendMsg(文字列キー、文字列データ) {
  2. CompletableFuture<MessageId> future = producer.newMessage()
  3. キー(キー)
  4. .value(data.getBytes()).sendAsync();
  5. future.handle((v, ex) -> {
  6. if (ex == null ) {
  7. logger.info( "メッセージは正常に送信されました、キー:{}、メッセージ:{}" キー、データ);
  8. }それ以外{
  9. logger.error( "メッセージの送信に失敗しました。キー:{}、メッセージ:{}" キー、データ);
  10. }
  11. 戻る ヌル;
  12. });
  13. 将来参加();
  14. logger.info( "メッセージの送信が完了しました、キー:{}、メッセージ:{}" キー、データ);
  15. }

次に、メッセージを消費するための Consumer コードを次のように記述します。

  1. パブリックvoid start() は例外をスローします{
  2. )の間{
  3. メッセージ message = consumer.receive();
  4. 文字列キー= message.getKey();
  5. 文字列データ = new String(message.getData());
  6. 文字列トピック = message.getTopicName();
  7. StringUtils.isNotEmpty(データ)の場合{
  8. 試す{
  9. logger.info( "受信メッセージ、トピック:{}、キー:{}、データ:{}" 、トピック、キー、データ);
  10. }catch(例外 e){
  11. logger.error( "メッセージ受信例外、トピック:{}、キー:{}、データ:{}" 、トピック、キー、データ、e);
  12. }
  13. }
  14. 消費者はメッセージを確認します。
  15. }
  16. }

最後に、Producer を呼び出してメッセージを送信する Controller クラスを記述します。コードは次のとおりです。

  1. @RequestMapping( "/送信" )
  2. @レスポンス本文
  3. パブリック文字列送信(@RequestParam 文字列キー、@RequestParam 文字列データ) {
  4. logger.info( "メッセージ送信要求を受信しました、キー:{}、値:{}" キー、データ);
  5. pulsarProducer.sendMsg(キー、データ );
  6. 戻る  "成功" ;
  7. }

プロデューサーを呼び出して、キー = key1、データ = data1 のメッセージを送信します。具体的な操作は、ブラウザに次の URL を入力して Enter キーを押すことです。

  1. http://192.168.157.1:8083/pulsar/send?キー=key1&データ =data1

コンソールに次のログ出力が表示されます。

  1. 2022-01-08 22:42:33,199 [pulsar-client-io-6-1] [INFO] boot.pulsar.PulsarProducer - メッセージを正常に送信しました。キー: key1、メッセージ: data1
  2. 2022-01-08 22:42:33,200 [http-nio-8083- exec -1] [INFO] boot.pulsar.PulsarProducer - メッセージの送信が完了しました。キー: key1、メッセージ: data1
  3. 2022-01-08 22:42:33,232 [スレッド-22] [INFO] boot.pulsar.PulsarConsumer - メッセージを受信しました。トピック:persistent:// public / default /testTopic、キー: key1、データ: data1
  4. 2022-01-08 22:43:14,498 [pulsar-timer-5-1] [INFO] org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [testTopic] [topicGroup] [7def6] プリフェッチされたメッセージ: 0 --- 受信した消費スループット: 0.02 メッセージ/秒 --- 0.00 メガビット/秒 --- 確認応答送信レート: 0.02 確認応答/秒 --- 失敗したメッセージ: 0 --- バッチメッセージ: 0 --- 失敗した確認応答: 0  
  5. 2022-01-08 22:43:14,961 [pulsar-timer-9-1] [INFO] org.apache.pulsar.client.impl.ProducerStatsRecorderImpl - [testTopic] [standalone-9-0] 保留中のメッセージ: 0 --- パブリッシュ スループット: 0.02 メッセージ/秒 --- 0.00 メガビット/秒 --- レイテンシ: 中: 69.000 ミリ秒 - 95 パーセント: 69.000 ミリ秒 - 99 パーセント: 69.000 ミリ秒 - 99.9 パーセント: 69.000 ミリ秒 - 最大: 69.000 ミリ秒 --- 確認応答受信率: 0.02 確認応答/秒 --- 失敗したメッセージ: 0  

ログから、ここで使用されている名前空間は、クラスターの作成時に生成された public/default であることがわかります。

要約する

SpringBoot と Java クライアントの統合の観点から見ると、Pulsar の API は非常にフレンドリーで使いやすいです。 Consumer を使用する場合は、バッチ、非同期、サブスクリプション タイプなど、さらに考慮する必要があります。

<<:  企業情報化の集中構築は分散ユニットアーキテクチャに戻るべきである

>>:  マルチクラウドトランスポートネットワークシステムの構築方法

推薦する

効果を最大化するためにソフト記事のプロモーションチャネルを選択するにはどうすればよいでしょうか?

はじめに:ソフト記事のプロモーション チャネルを選択することは決して難しいことではありません。難しい...

ウェブサイトの降格問題を的確に解決する方法

SEO に携わっている友人の多くは、毎日仕事場に到着してコンピューターの電源を入れた後、習慣的に最初...

事例分析:WeChatエコシステムのマーケティング価値!

今年8月の人気記事「6年、公会計が運命を変えた」は、春秋文体で公会計の発展に壮大な雰囲気を与えた。実...

三亜市が「観光客への過剰請求」事件を振り返り、危機から学んだ教訓を微博で広報

辰年の春節が始まった頃、三亜の「観光客ぼったくり事件」は大東海に投下された重い世論爆弾のようであり、...

正直に言うと、SEO 業界はどれくらい奥が深いのでしょうか?

SEO は 1997 年に始まり、百度よりも古い 15 年以上の歴史があると一般に認識されています。...

SEO ウェブサイト最適化 リソース構築 ブログ運用 スキル共有

検索エンジンのアルゴリズムの調整に伴い、主要な検索エンジンによるブログの認知度は、大多数のSEO担当...

WeiboとWeChatのどちらのマーケティング価値が高いでしょうか?

WeiboとWeChatを比較すると、どちらの方がマーケティング価値があるのでしょうか?これは多くの...

Baidu が頻繁に新製品をリリースしていることについての SEO 専門家の推測

2012年はBaiduにとって激動の年であり、私たちウェブマスターにとっても激動の年でした。長年SE...

CosmosTeck - $9.9/年/512MB RAM/5GB HDD/500GB Flow/ロサンゼルス/ダラス/フェニックス

CosmosTeck はバンドワゴンのリズムを学ぼうとしているのでしょうか? 512M メモリを搭載...

Windows サーバー セキュリティに関する 7 つのヒント

最近頻発しているネットワーク セキュリティ インシデントは、ネットワーク セキュリティに注意を払うに...

Baidu K Station の裏にある真実: ウェブマスターが宣戦布告する無駄なものにとらわれないようにしましょう

6月28日のBaidu Kステーション事件は、2012年にインターネットに影響を与えた最もホットな出...

#blackfriday# hostus - 年間 50 ドル / 6G メモリ / 200g ハードドライブ / 7T トラフィック / 7 つのデータセンター

Hostus のブラック フライデー プロモーションが始まりました。6G メモリを搭載した超ハイエン...

SEO初心者の視点から見たダイヤモンドバード

EBAY発祥のオンラインダイヤモンドブランド「Diamond Bird」は、わずか数年でインターネッ...

ウェブサイトのユーザー エクスペリエンス分析: 視線の動きに関する 12 の誤解

視線追跡技術(以下、眼球運動)の応用については学界やビジネス界でも注目されており、時折白熱した議論が...