みなさんこんにちは。ジュン兄です。 優れたメッセージストリーミングプラットフォームとして、Pulsar はますます使用されています。この記事では、Pulsar の Java クライアントについて説明します。 パルサーを展開するPulsar をデプロイするには、ローカル バイナリ インストール、Docker デプロイ、Kubernetes へのデプロイという 3 つの主な方法があります。 この記事では、docker を使用して単一ノードの Pulsar クラスターをデプロイします。実験環境は2コアCPU、4Gメモリです。 デプロイメントコマンドは次のとおりです。
インストール プロセス中に次のエラーが発生する可能性があります。
これは、docker のバージョンが低く、マウント パラメータをサポートしていないためです。 docker バージョンを 17.06 以上にアップグレードするだけです。 ネットワーク上の理由により、展開プロセスが失敗する可能性がありますが、数回試行すると成功する可能性があります。次のログが表示されれば、起動は成功したことになります。
ローカルのシングルノードクラスタが起動すると、public/defaultという名前空間が作成されます。 パルサークライアント現在、Pulsar は以下を含む複数の言語でクライアントをサポートしています。 Java クライアント Go クライアント Python クライアント C++ クライアント Node.js クライアント WebSocket クライアント C# クライアント SpringBoot 構成SpringBoot を使用して Pulsar クライアントを統合します。まず、Pulsar クライアントの依存関係を導入します。コードは次のとおりです。
次に、プロパティ ファイルに構成を追加します。
クライアントの作成クライアントの作成は非常に簡単で、コードは次のとおりです。
上記の URL は、プロパティ ファイルで定義されている pulsar.url です。 クライアントを作成するときに、クラスターが正常に起動されなかったとしても、実際にはまだクラスターに接続されていないため、プログラムはエラーを報告しません。 プロデューサーを作成
プロデューサーを作成すると、実際にクラスターに接続します。クラスターに問題がある場合は、接続エラーが報告されます。 プロデューサーを作成するためのパラメータについて次に説明します。 トピック: プロデューサーが書きたいトピック。 compressionType: 圧縮戦略。現在、4 つの戦略がサポートされています (NONE、LZ4、ZLIB、ZSTD)。 Pulsar 2.3 以降では、この戦略はコンシューマー バージョンが 2.3 以上の場合にのみ有効になります。 sendTimeout: タイムアウト期間。プロデューサーがタイムアウト期間内に ACK を受信しない場合、メッセージを再送信します。 enableBatching: メッセージのバッチ処理を有効にするかどうか。デフォルト値は true です。このパラメータは、非同期送信 (sendAsync) の場合にのみ有効です。同期送信を選択した場合は無効になります。 batchingMaxPublishDelay: メッセージをバッチで送信する期間。ここでは10msと定義します。バッチ処理時間を設定すると、メッセージの数によって影響を受けなくなることに注意してください。バッチ送信では、送信するバッチ メッセージを 1 つのネットワーク パケットに入れて送信するため、ネットワーク IO 回数が削減され、ネットワーク カードの送信効率が大幅に向上します。 batchingMaxMessages: バッチで送信されるメッセージの最大数。 maxPendingMessages: ブローカーからの ACK の受信を待機しているメッセージ キューの最大長。キューがいっぱいの場合、blockIfQueueFull 値が true に設定されていない限り、プロデューサーのすべての sendAsync および send 呼び出しは失敗します。 blockIfQueueFull: プロデューサーがメッセージを送信すると、まずそのメッセージがローカル キュー キャッシュに格納されます。キャッシュがいっぱいになると、メッセージの送信がブロックされます。 roundRobinRouterBatchingPartition-SwitchFrequency: メッセージを送信するときにキーが指定されていない場合、デフォルトでラウンドロビン方式を使用してメッセージが送信されます。ラウンドロビン方式を使用する場合、パーティション切り替え周期は (頻度 * batchingMaxPublishDelay) になります。 消費者の創造Pulsar の消費モデルは次のとおりです。 図からわかるように、コンシューマーは消費する前にサブスクリプションをバインドする必要があります。
以下では、コンシューマーを作成するためのパラメータについて説明します。 トピック: コンシューマーがサブスクライブするトピック。 subscriptionName: コンシューマーが関連付けるサブスクリプションの名前。 subscriptionType: サブスクリプションの種類。 Pulsar は次の 4 種類のサブスクリプションをサポートしています。 排他: 排他モード。同じトピックに対してコンシューマーは 1 つしか存在できません。コンシューマーが複数存在する場合、エラーが発生します。フェイルオーバー: 災害復旧モード。同じトピックには複数のコンシューマーが存在する可能性がありますが、消費できるのは 1 つのコンシューマーだけです。他のコンシューマーはフェイルオーバー バックアップとして機能します。現在のコンシューマーに障害が発生した場合、バックアップ コンシューマーの 1 つが選択されて消費されます。以下のように表示されます。 共有: 共有モードでは、同じトピックを複数のコンシューマーがサブスクライブして使用できます。メッセージはラウンドロビン ポーリング メカニズムを通じてさまざまなコンシューマーに配信され、各メッセージは 1 つのコンシューマーにのみ配信されます。コンシューマーが切断された場合、そのコンシューマーに送信されたメッセージが消費されていない場合、これらのメッセージは他の存続しているコンシューマーに再配布されます。以下のように表示されます。 Key_Shared: メッセージとコンシューマーの両方がキーにバインドされ、メッセージは同じキーにバインドされたコンシューマーにのみ送信されます。新しいコンシューマーが接続を確立するか、コンシューマーが切断された場合、一部のメッセージのキーを更新する必要があります。 Shared モードと比較すると、Key_Shared の利点は、同じキーのメッセージの順序を確保しながら、コンシューマーが同時にメッセージを消費できることです。以下のように表示されます。 subscriptionInitialPosition: 新しいサブスクリプションを作成するときに消費を開始する場所。次の 2 つのオプションがあります。 最新: 最新のメッセージから消費を開始します。最古: 最も古いメッセージから消費を開始します。 negativeAckRedeliveryDelay: 消費失敗後にブローカーが再送信する間隔。 受信キューサイズ: 受信メソッドが呼び出される前に蓄積できるメッセージの最大数。 0 に設定すると、ブローカーから一度にプルされるメッセージは 1 つだけになります。共有モードでは、バッチ メッセージが複数のコンシューマーに送信され、他のコンシューマーがアイドル状態になるのを防ぐために、receiverQueueSize は 0 に設定されます。 コンシューマーがメッセージを受信する方法は、同期単一、同期バッチ、非同期単一、非同期バッチの 4 つがあります。コードは次のとおりです。
バッチ受信の場合は、バッチ受信戦略も設定できます。コードは次のとおりです。
コード内のパラメータは次のように記述されます。 maxNumMessages: バッチで受信されるメッセージの最大数。 maxNumBytes: 受信したバッチメッセージのサイズ。ここでは 1MB です。 テストまず、Producer がメッセージを送信するためのコードを次のように記述します。
次に、メッセージを消費するための Consumer コードを次のように記述します。
最後に、Producer を呼び出してメッセージを送信する Controller クラスを記述します。コードは次のとおりです。
プロデューサーを呼び出して、キー = key1、データ = data1 のメッセージを送信します。具体的な操作は、ブラウザに次の URL を入力して Enter キーを押すことです。
コンソールに次のログ出力が表示されます。
ログから、ここで使用されている名前空間は、クラスターの作成時に生成された public/default であることがわかります。 要約するSpringBoot と Java クライアントの統合の観点から見ると、Pulsar の API は非常にフレンドリーで使いやすいです。 Consumer を使用する場合は、バッチ、非同期、サブスクリプション タイプなど、さらに考慮する必要があります。 |
<<: 企業情報化の集中構築は分散ユニットアーキテクチャに戻るべきである
>>: マルチクラウドトランスポートネットワークシステムの構築方法
Hostsolutions は、ルーマニアの Oradea データセンターで KVM シリーズ VP...
少し前、シルク・ドゥ・ソレイユが破産を申請したというニュースが、あらゆる階層から深い遺憾の意を引き起...
1. 本物のユーザーを見つけたいなら、アプリストアにお金を払おう●利点:実際のユーザーが多く、信頼性...
外部リンクは、ウェブサイトのキーワードランキングに影響を与える最も重要な要素の 1 つです。現在、ほ...
ウェブサイトの構築では、ウェブページのリダイレクトが必要な状況によく遭遇します。たとえば、ウェブペー...
海外VPS(海外VPSレンタル、海外VPSレンタル)をレンタルする場合、(1)国内市場と比較した速度...
クラウド ストレージ キャッシュにより、パフォーマンスを低下させる遅延を削減できます。これらのデバイ...
私はこの記事を世界を変えることを夢見る人々に捧げますシュ・シュン何年も経って、あの若いプロダクトマネ...
クラウド コンピューティングの最適化という概念は、企業がクラウド コンピューティングの価値を懸念して...
[[386470]]画像はPexelsより罠に落ちる「次兄さん、やっと来たね。計画通りにやろう。ハー...
ウェブサイトの構造は SEO の基礎です。ウェブサイト内の最適化は、大きく分けて 2 つの部分に分け...
今年上半期が終わりに近づく中、中国の化粧品業界ではどんな新たなトレンドが生まれているのでしょうか? ...
先日終了したメーデーの休日中、目的地やアトラクションでは、おなじみの「群衆に従う」休日パターンが再び...
少し前に、Baidu はサイトリンク機能を正式にリリースしました。これにより、多くの有名な大規模ウェ...
godaddy からわずか 12 ドルで、PayPal、クレジットカード、Alipay をサポートす...