みなさんこんにちは。ジュン兄です。 優れたメッセージストリーミングプラットフォームとして、Pulsar はますます使用されています。この記事では、Pulsar の Java クライアントについて説明します。 パルサーを展開するPulsar をデプロイするには、ローカル バイナリ インストール、Docker デプロイ、Kubernetes へのデプロイという 3 つの主な方法があります。 この記事では、docker を使用して単一ノードの Pulsar クラスターをデプロイします。実験環境は2コアCPU、4Gメモリです。 デプロイメントコマンドは次のとおりです。
インストール プロセス中に次のエラーが発生する可能性があります。
これは、docker のバージョンが低く、マウント パラメータをサポートしていないためです。 docker バージョンを 17.06 以上にアップグレードするだけです。 ネットワーク上の理由により、展開プロセスが失敗する可能性がありますが、数回試行すると成功する可能性があります。次のログが表示されれば、起動は成功したことになります。
ローカルのシングルノードクラスタが起動すると、public/defaultという名前空間が作成されます。 パルサークライアント現在、Pulsar は以下を含む複数の言語でクライアントをサポートしています。 Java クライアント Go クライアント Python クライアント C++ クライアント Node.js クライアント WebSocket クライアント C# クライアント SpringBoot 構成SpringBoot を使用して Pulsar クライアントを統合します。まず、Pulsar クライアントの依存関係を導入します。コードは次のとおりです。
次に、プロパティ ファイルに構成を追加します。
クライアントの作成クライアントの作成は非常に簡単で、コードは次のとおりです。
上記の URL は、プロパティ ファイルで定義されている pulsar.url です。 クライアントを作成するときに、クラスターが正常に起動されなかったとしても、実際にはまだクラスターに接続されていないため、プログラムはエラーを報告しません。 プロデューサーを作成
プロデューサーを作成すると、実際にクラスターに接続します。クラスターに問題がある場合は、接続エラーが報告されます。 プロデューサーを作成するためのパラメータについて次に説明します。 トピック: プロデューサーが書きたいトピック。 compressionType: 圧縮戦略。現在、4 つの戦略がサポートされています (NONE、LZ4、ZLIB、ZSTD)。 Pulsar 2.3 以降では、この戦略はコンシューマー バージョンが 2.3 以上の場合にのみ有効になります。 sendTimeout: タイムアウト期間。プロデューサーがタイムアウト期間内に ACK を受信しない場合、メッセージを再送信します。 enableBatching: メッセージのバッチ処理を有効にするかどうか。デフォルト値は true です。このパラメータは、非同期送信 (sendAsync) の場合にのみ有効です。同期送信を選択した場合は無効になります。 batchingMaxPublishDelay: メッセージをバッチで送信する期間。ここでは10msと定義します。バッチ処理時間を設定すると、メッセージの数によって影響を受けなくなることに注意してください。バッチ送信では、送信するバッチ メッセージを 1 つのネットワーク パケットに入れて送信するため、ネットワーク IO 回数が削減され、ネットワーク カードの送信効率が大幅に向上します。 batchingMaxMessages: バッチで送信されるメッセージの最大数。 maxPendingMessages: ブローカーからの ACK の受信を待機しているメッセージ キューの最大長。キューがいっぱいの場合、blockIfQueueFull 値が true に設定されていない限り、プロデューサーのすべての sendAsync および send 呼び出しは失敗します。 blockIfQueueFull: プロデューサーがメッセージを送信すると、まずそのメッセージがローカル キュー キャッシュに格納されます。キャッシュがいっぱいになると、メッセージの送信がブロックされます。 roundRobinRouterBatchingPartition-SwitchFrequency: メッセージを送信するときにキーが指定されていない場合、デフォルトでラウンドロビン方式を使用してメッセージが送信されます。ラウンドロビン方式を使用する場合、パーティション切り替え周期は (頻度 * batchingMaxPublishDelay) になります。 消費者の創造Pulsar の消費モデルは次のとおりです。 図からわかるように、コンシューマーは消費する前にサブスクリプションをバインドする必要があります。
以下では、コンシューマーを作成するためのパラメータについて説明します。 トピック: コンシューマーがサブスクライブするトピック。 subscriptionName: コンシューマーが関連付けるサブスクリプションの名前。 subscriptionType: サブスクリプションの種類。 Pulsar は次の 4 種類のサブスクリプションをサポートしています。 排他: 排他モード。同じトピックに対してコンシューマーは 1 つしか存在できません。コンシューマーが複数存在する場合、エラーが発生します。フェイルオーバー: 災害復旧モード。同じトピックには複数のコンシューマーが存在する可能性がありますが、消費できるのは 1 つのコンシューマーだけです。他のコンシューマーはフェイルオーバー バックアップとして機能します。現在のコンシューマーに障害が発生した場合、バックアップ コンシューマーの 1 つが選択されて消費されます。以下のように表示されます。 共有: 共有モードでは、同じトピックを複数のコンシューマーがサブスクライブして使用できます。メッセージはラウンドロビン ポーリング メカニズムを通じてさまざまなコンシューマーに配信され、各メッセージは 1 つのコンシューマーにのみ配信されます。コンシューマーが切断された場合、そのコンシューマーに送信されたメッセージが消費されていない場合、これらのメッセージは他の存続しているコンシューマーに再配布されます。以下のように表示されます。 Key_Shared: メッセージとコンシューマーの両方がキーにバインドされ、メッセージは同じキーにバインドされたコンシューマーにのみ送信されます。新しいコンシューマーが接続を確立するか、コンシューマーが切断された場合、一部のメッセージのキーを更新する必要があります。 Shared モードと比較すると、Key_Shared の利点は、同じキーのメッセージの順序を確保しながら、コンシューマーが同時にメッセージを消費できることです。以下のように表示されます。 subscriptionInitialPosition: 新しいサブスクリプションを作成するときに消費を開始する場所。次の 2 つのオプションがあります。 最新: 最新のメッセージから消費を開始します。最古: 最も古いメッセージから消費を開始します。 negativeAckRedeliveryDelay: 消費失敗後にブローカーが再送信する間隔。 受信キューサイズ: 受信メソッドが呼び出される前に蓄積できるメッセージの最大数。 0 に設定すると、ブローカーから一度にプルされるメッセージは 1 つだけになります。共有モードでは、バッチ メッセージが複数のコンシューマーに送信され、他のコンシューマーがアイドル状態になるのを防ぐために、receiverQueueSize は 0 に設定されます。 コンシューマーがメッセージを受信する方法は、同期単一、同期バッチ、非同期単一、非同期バッチの 4 つがあります。コードは次のとおりです。
バッチ受信の場合は、バッチ受信戦略も設定できます。コードは次のとおりです。
コード内のパラメータは次のように記述されます。 maxNumMessages: バッチで受信されるメッセージの最大数。 maxNumBytes: 受信したバッチメッセージのサイズ。ここでは 1MB です。 テストまず、Producer がメッセージを送信するためのコードを次のように記述します。
次に、メッセージを消費するための Consumer コードを次のように記述します。
最後に、Producer を呼び出してメッセージを送信する Controller クラスを記述します。コードは次のとおりです。
プロデューサーを呼び出して、キー = key1、データ = data1 のメッセージを送信します。具体的な操作は、ブラウザに次の URL を入力して Enter キーを押すことです。
コンソールに次のログ出力が表示されます。
ログから、ここで使用されている名前空間は、クラスターの作成時に生成された public/default であることがわかります。 要約するSpringBoot と Java クライアントの統合の観点から見ると、Pulsar の API は非常にフレンドリーで使いやすいです。 Consumer を使用する場合は、バッチ、非同期、サブスクリプション タイプなど、さらに考慮する必要があります。 |
<<: 企業情報化の集中構築は分散ユニットアーキテクチャに戻るべきである
>>: マルチクラウドトランスポートネットワークシステムの構築方法
CloudShards は、Cloud Shards DBA Query Foundry, LLC ...
はじめに:ソフト記事のプロモーション チャネルを選択することは決して難しいことではありません。難しい...
SEO に携わっている友人の多くは、毎日仕事場に到着してコンピューターの電源を入れた後、習慣的に最初...
今年8月の人気記事「6年、公会計が運命を変えた」は、春秋文体で公会計の発展に壮大な雰囲気を与えた。実...
辰年の春節が始まった頃、三亜の「観光客ぼったくり事件」は大東海に投下された重い世論爆弾のようであり、...
SEO は 1997 年に始まり、百度よりも古い 15 年以上の歴史があると一般に認識されています。...
検索エンジンのアルゴリズムの調整に伴い、主要な検索エンジンによるブログの認知度は、大多数のSEO担当...
WeiboとWeChatを比較すると、どちらの方がマーケティング価値があるのでしょうか?これは多くの...
2012年はBaiduにとって激動の年であり、私たちウェブマスターにとっても激動の年でした。長年SE...
CosmosTeck はバンドワゴンのリズムを学ぼうとしているのでしょうか? 512M メモリを搭載...
最近頻発しているネットワーク セキュリティ インシデントは、ネットワーク セキュリティに注意を払うに...
6月28日のBaidu Kステーション事件は、2012年にインターネットに影響を与えた最もホットな出...
Hostus のブラック フライデー プロモーションが始まりました。6G メモリを搭載した超ハイエン...
EBAY発祥のオンラインダイヤモンドブランド「Diamond Bird」は、わずか数年でインターネッ...
視線追跡技術(以下、眼球運動)の応用については学界やビジネス界でも注目されており、時折白熱した議論が...