Golang 言語の Kafka クライアント ライブラリ Sarama

Golang 言語の Kafka クライアント ライブラリ Sarama

01. はじめに

Apache Kafka はオープンソースのメッセージング エンジン システムです。プロジェクトにおけるその主な役割は、ピーク負荷を軽減し、谷を埋めて分離することです。この記事では、Apache Kafka 用の Golang クライアント ライブラリである Sarama のみを紹介します。 Sarama は、Apache Kafka 0.8 以降用の MIT ライセンスの Golang クライアント ライブラリです。

Apache Kafka サーバーに慣れていない場合は、まず公式ドキュメントの「Getting Started」セクションを読むことをお勧めします。この記事では、Apache Kafka 2.8 バージョンを使用します。

[[397879]]

02. プロデューサー

Sarama ライブラリの AsyncProducer または SyncProducer を使用してメッセージを生成できます。ほとんどの場合、メッセージを生成するには AsyncProducer を使用することをお勧めします。チャネルを通じてメッセージを受信し、バックグラウンドで可能な限り効率的に非同期的にメッセージを生成します。

SyncProducer は、Kafka メッセージを送信した後、ACK 確認を受信するまでブロックします。 SyncProducer には 2 つの注意点があります。一般的に効率が低く、実際の耐久性の保証は Producer.RequiredAcks の構成値によって異なります。構成によっては、SyncProducer によって確認されたメッセージが失われる場合もありますが、使用方法はより簡単になります。

読者の理解を深めるために、この記事では SyncProducer をプロデューサーとして使用する方法を紹介します。 AsyncProducer をプロデューサーとして使用する方法を知りたい読者は、公式ドキュメントを参照してください。

SyncProducer をプロデューサーとして使用するサンプル コード:

  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {
  2. プロデューサー、エラー:= sarama.NewSyncProducer(brokerAddr, config)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = プロデューサーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. メッセージ:= &sarama.ProducerMessage{
  14. トピック: トピック、
  15. 値: 値、
  16. }
  17. パーティション、オフセット、エラー:=producer.SendMessage(msg)
  18. err != nil の場合 {
  19. fmt.Println(エラー)
  20. 戻る 
  21. }
  22. fmt.Printf( "パーティション:%d オフセット:%d\n" , パーティション, オフセット)
  23. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewSyncProducer() を呼び出して新しい SyncProducer を作成します。 SendMessage() を呼び出すと、指定されたメッセージが生成され、生成が成功したか失敗したかの場合にのみ戻ります。生成されたメッセージのパーティションとオフセットを返します。メッセージの生成が失敗した場合はエラーを返します。

リークを回避するには、プロデューサーで Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに自動的にガベージ コレクションされない可能性があるためです。

03. 消費者

Sarama ライブラリの Consumer または ConsumerGroup API を使用してメッセージを消費できます。読者の理解を容易にするために、この記事では Consumer を使用してメッセージを消費する方法を紹介します。

Consumer は、ブローカーからの Kafka メッセージを処理する PartitionConsumers を管理します。

メッセージを消費するコンシューマーのサンプル コード:

  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) {
  2. 消費者、エラー:= sarama.NewConsumer(brokenAddr, nil)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = コンシューマーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. パーティションコンシューマー、エラー:= consumer.ConsumePartition(トピック、パーティション、オフセット)
  14. err != nil の場合 {
  15. fmt.Println(エラー)
  16. 戻る 
  17. }
  18. 遅延関数() {
  19. err = partitionConsumer の場合。近い();エラー != ゼロ {
  20. fmt.Println(エラー)
  21. 戻る 
  22. }
  23. }()
  24. msg := range の場合、partitionConsumer.Messages() {
  25. fmt.Printf( "パーティション:%d オフセット:%d キー:%s 値:%s\n" , msg.Partition, msg.Offset, msg.Key , msg.Value)
  26. }
  27. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewConsumer() を呼び出して新しいコンシューマーを作成します。トピック、パーティション、オフセットを指定して、ConsumePartition() を呼び出して PartitionConsumer を作成します。 PartitionConsumer は、指定されたトピックとパーティションからの Kafka メッセージを処理します。

リークを防ぐために、consumer とpartitionConsumer で Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに、自動的にガベージ コレクションされない可能性があるためです。

04. 結論

この記事では主に、Apache Kafka の Golang 言語クライアント ライブラリ Sarama を使用して Kafka メッセージを生成および消費する方法を紹介します。生産者と消費者の両方に対して簡単な例を示します。さらに、Sarama ライブラリは他の多くの API も提供します。興味のある読者は、公式ドキュメントを読んで詳細を確認してください。

<<:  クラウド移行を実施する前にコストを計算する方法

>>:  Tencent Qianfanと提携し、SalesEasy PaaSプラットフォームが企業のアプリケーションのカスタマイズを支援

推薦する

SEO初心者は傲慢さを捨てなければならない

今、SEO業界に参入しようと言うと、他の人から強く思いとどまられるでしょう。なぜなら、現状では、情報...

Rackhost - $4.99/Kvm/Windows/1g メモリ/250g ハードディスク/1000M/無制限トラフィック

rack.sx は、2002 年からインターネット業界に携わっていると主張しています。ITDataT...

入札と SEO は、病院の Web サイトの収益性の出発点でも終点でもすべてでもありません。

インターネットが人々の生活にますます溶け込むにつれて、あらゆる業界が知らず知らずのうちにインターネッ...

SEO最適化の最高レベルは口コミ最適化です

これまで、SEO について話すとき、それは主に、特定のキーワードで検索したときに自分の Web サイ...

secureragon-特別版DDOS保護VPS

secureragon はトップクラスの VPS 業者です。リソースをいかにケチっているかがよくわか...

推奨: liquidweb-79 USD/E3-1271v3/8 GB RAM/250 GB SSD/raid1/5 TB トラフィック

liquidweb.com は、19 年の歴史を持つ有名なハイエンド (安物ではない)ホスティング ...

Baidu のスナップショットが多くのウェブマスターを困惑させる

最近、ウェブマスターの間で「Baiduスナップショット」という話題が話題になっています。Baiduス...

ジャック・マーが33億人民元で恒生グループを買収:恒生電子の筆頭株主に

新浪科技は4月3日早朝、恒生電子が発表した発表によると、ジャック・マーは浙江栄鑫ネットワークテクノロ...

品質を向上させ、入札を簡単にマスターするためのクリエイティブ最適化のコツ

品質は、検索エンジンマーケティング (SEM) において非常に重要な要素です。アカウント全体の最適化...

pzea: 香港 VPS、シンガポール VPS、日本 VPS、すべて 30% オフ、一部 50% オフ、Windows 対応

pzea は割引を提供しています: シンガポール VPS、香港 VPS、日本 VPS はすべて、月払...

毎日のトピック: ビットコイン取引プラットフォームは閉鎖に直面していますか?ビットコインの全盛期は終わったのか?

A5ウェブマスターネットワーク(www.admin5.com)は4月4日、中国のビットコイン取引プラ...

インサイト: VUCA時代のデジタル人材管理を考える

[51CTO.comより引用] 2020年のCOVID-19パンデミックの発生と米中貿易戦争の激化に...

クラウド サービス OpenAPI の 7 つの主要な課題: アーキテクトはどのように対処すべきでしょうか?

[[279233]] API は、モジュールまたはサブシステム間の相互作用のためのインターフェース定...

TUN デバイスの魔法 - フランネル UDP モード

みなさんこんにちは。私は次男です。 「トロイの木馬 - 図解 VXLAN コンテナ ネットワーク通信...

クラウドセキュリティを企業のイノベーションの推進力にし、「タマネギ」スタイルの多層保護を構築しましょう

クラウドへの移行は、企業のデジタル構築における新たな標準となっています。企業はクラウドでのイノベーシ...