Golang 言語の Kafka クライアント ライブラリ Sarama

Golang 言語の Kafka クライアント ライブラリ Sarama

01. はじめに

Apache Kafka はオープンソースのメッセージング エンジン システムです。プロジェクトにおけるその主な役割は、ピーク負荷を軽減し、谷を埋めて分離することです。この記事では、Apache Kafka 用の Golang クライアント ライブラリである Sarama のみを紹介します。 Sarama は、Apache Kafka 0.8 以降用の MIT ライセンスの Golang クライアント ライブラリです。

Apache Kafka サーバーに慣れていない場合は、まず公式ドキュメントの「Getting Started」セクションを読むことをお勧めします。この記事では、Apache Kafka 2.8 バージョンを使用します。

[[397879]]

02. プロデューサー

Sarama ライブラリの AsyncProducer または SyncProducer を使用してメッセージを生成できます。ほとんどの場合、メッセージを生成するには AsyncProducer を使用することをお勧めします。チャネルを通じてメッセージを受信し、バックグラウンドで可能な限り効率的に非同期的にメッセージを生成します。

SyncProducer は、Kafka メッセージを送信した後、ACK 確認を受信するまでブロックします。 SyncProducer には 2 つの注意点があります。一般的に効率が低く、実際の耐久性の保証は Producer.RequiredAcks の構成値によって異なります。構成によっては、SyncProducer によって確認されたメッセージが失われる場合もありますが、使用方法はより簡単になります。

読者の理解を深めるために、この記事では SyncProducer をプロデューサーとして使用する方法を紹介します。 AsyncProducer をプロデューサーとして使用する方法を知りたい読者は、公式ドキュメントを参照してください。

SyncProducer をプロデューサーとして使用するサンプル コード:

  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {
  2. プロデューサー、エラー:= sarama.NewSyncProducer(brokerAddr, config)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = プロデューサーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. メッセージ:= &sarama.ProducerMessage{
  14. トピック: トピック、
  15. 値: 値、
  16. }
  17. パーティション、オフセット、エラー:=producer.SendMessage(msg)
  18. err != nil の場合 {
  19. fmt.Println(エラー)
  20. 戻る 
  21. }
  22. fmt.Printf( "パーティション:%d オフセット:%d\n" , パーティション, オフセット)
  23. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewSyncProducer() を呼び出して新しい SyncProducer を作成します。 SendMessage() を呼び出すと、指定されたメッセージが生成され、生成が成功したか失敗したかの場合にのみ戻ります。生成されたメッセージのパーティションとオフセットを返します。メッセージの生成が失敗した場合はエラーを返します。

リークを回避するには、プロデューサーで Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに自動的にガベージ コレクションされない可能性があるためです。

03. 消費者

Sarama ライブラリの Consumer または ConsumerGroup API を使用してメッセージを消費できます。読者の理解を容易にするために、この記事では Consumer を使用してメッセージを消費する方法を紹介します。

Consumer は、ブローカーからの Kafka メッセージを処理する PartitionConsumers を管理します。

メッセージを消費するコンシューマーのサンプル コード:

  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) {
  2. 消費者、エラー:= sarama.NewConsumer(brokenAddr, nil)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = コンシューマーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. パーティションコンシューマー、エラー:= consumer.ConsumePartition(トピック、パーティション、オフセット)
  14. err != nil の場合 {
  15. fmt.Println(エラー)
  16. 戻る 
  17. }
  18. 遅延関数() {
  19. err = partitionConsumer の場合。近い();エラー != ゼロ {
  20. fmt.Println(エラー)
  21. 戻る 
  22. }
  23. }()
  24. msg := range の場合、partitionConsumer.Messages() {
  25. fmt.Printf( "パーティション:%d オフセット:%d キー:%s 値:%s\n" , msg.Partition, msg.Offset, msg.Key , msg.Value)
  26. }
  27. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewConsumer() を呼び出して新しいコンシューマーを作成します。トピック、パーティション、オフセットを指定して、ConsumePartition() を呼び出して PartitionConsumer を作成します。 PartitionConsumer は、指定されたトピックとパーティションからの Kafka メッセージを処理します。

リークを防ぐために、consumer とpartitionConsumer で Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに、自動的にガベージ コレクションされない可能性があるためです。

04. 結論

この記事では主に、Apache Kafka の Golang 言語クライアント ライブラリ Sarama を使用して Kafka メッセージを生成および消費する方法を紹介します。生産者と消費者の両方に対して簡単な例を示します。さらに、Sarama ライブラリは他の多くの API も提供します。興味のある読者は、公式ドキュメントを読んで詳細を確認してください。

<<:  クラウド移行を実施する前にコストを計算する方法

>>:  Tencent Qianfanと提携し、SalesEasy PaaSプラットフォームが企業のアプリケーションのカスタマイズを支援

推薦する

企業サイトの SEO トラフィックコンバージョン率に影響を与える要因の分析例

お客様から「企業のウェブサイトのSEOトラフィックのコンバージョン率を向上させるにはどうすればよいか...

123systems - 3 年間 $34/2IP/3G メモリ/75g ハード ドライブ/3T トラフィック/4 コンピュータ ルーム

コロクロッシングが買収したブランドである123ystemsは、長い間登場していません。ホストキャット...

検索エンジンは頻繁にアルゴリズムを更新しており、SEO業界は将来的に圧縮されるだろう

みなさんこんにちは。私の名前はLiang Lei、オンライン名はStoneです。 6月以来、Baid...

AWSはアマゾンから分離される可能性、CEOは合意に従うと語る

最近、海外メディアCNBCによると、アマゾンのクラウドコンピューティング事業のCEO、アンディ・ジャ...

JD.com の 3 段階分類ページの SEO、ユーザー エクスペリエンス、マーケティング手法の簡単な分析 (パート 2)

昨日の「JDの3階層カテゴリーページのSEO、ユーザーエクスペリエンス、マーケティング手法の簡単な分...

gamedatainc-2g メモリ/80g ハードディスク/1T トラフィック/月額 5.95 ドル (IP は十分安価です)

Gamedataincは、VPSをメインに展開するgamedata傘下の新運営ブランドです! Gam...

ホストペア-$7.5/Windows/512m メモリ/15g ハードディスク/500g ハードディスク/G ポート/ダラス

HostPair LLC は、KVM 仮想環境に基づく Windows VPS をインストールしたこ...

domVPS-512M メモリ KVM 6.5 ドル ニュージャージー州、米国

domVPS は 2010 年に設立されました。同社の VPS はローエンドですが、全体的には問題あ...

Open Policy Agent で Kubernetes を保護する方法

コンテナ化されたアプリケーションを本番環境に移行する組織が増えるにつれて、Kubernetes はプ...

KOL マーケティング チャネルの変換を評価、監視、促進するにはどうすればよいでしょうか?

インターネットの急速な発展により、製品が王様の時代ではなく、コンテンツが王様の時代になりました。eコ...

優れた SEO にはどのような資質が必要ですか?

SEO には多くのことが関係しますが、始めるのは非常に簡単です。インターネットとコンピューターの操作...

ウェブサイトを分析する能力は、初心者ウェブマスターにとって必須のスキルです。

まず、なぜ「ウェブサイトの分析が得意であることは、初心者ウェブマスターにとって必須のスキルです」とい...

分散WebSocketソリューションについてお話しましょう

序文最近、自分でプロジェクトを構築しました。プロジェクト自体は非常にシンプルですが、メッセージリマイ...

衝動的な中国の電子商取引は、回り道を避けるために他から学ぶしかない

皆さん、こんにちは。最近、済南の天気は本当に暑いです。外を歩くとサウナに入るようなもので、思わず汗を...