Golang 言語の Kafka クライアント ライブラリ Sarama

Golang 言語の Kafka クライアント ライブラリ Sarama

01. はじめに

Apache Kafka はオープンソースのメッセージング エンジン システムです。プロジェクトにおけるその主な役割は、ピーク負荷を軽減し、谷を埋めて分離することです。この記事では、Apache Kafka 用の Golang クライアント ライブラリである Sarama のみを紹介します。 Sarama は、Apache Kafka 0.8 以降用の MIT ライセンスの Golang クライアント ライブラリです。

Apache Kafka サーバーに慣れていない場合は、まず公式ドキュメントの「Getting Started」セクションを読むことをお勧めします。この記事では、Apache Kafka 2.8 バージョンを使用します。

[[397879]]

02. プロデューサー

Sarama ライブラリの AsyncProducer または SyncProducer を使用してメッセージを生成できます。ほとんどの場合、メッセージを生成するには AsyncProducer を使用することをお勧めします。チャネルを通じてメッセージを受信し、バックグラウンドで可能な限り効率的に非同期的にメッセージを生成します。

SyncProducer は、Kafka メッセージを送信した後、ACK 確認を受信するまでブロックします。 SyncProducer には 2 つの注意点があります。一般的に効率が低く、実際の耐久性の保証は Producer.RequiredAcks の構成値によって異なります。構成によっては、SyncProducer によって確認されたメッセージが失われる場合もありますが、使用方法はより簡単になります。

読者の理解を深めるために、この記事では SyncProducer をプロデューサーとして使用する方法を紹介します。 AsyncProducer をプロデューサーとして使用する方法を知りたい読者は、公式ドキュメントを参照してください。

SyncProducer をプロデューサーとして使用するサンプル コード:

  1. func sendMessage (brokerAddr []string, config *sarama.Config, topic string, value sarama.Encoder) {
  2. プロデューサー、エラー:= sarama.NewSyncProducer(brokerAddr, config)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = プロデューサーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. メッセージ:= &sarama.ProducerMessage{
  14. トピック: トピック、
  15. 値: 値、
  16. }
  17. パーティション、オフセット、エラー:=producer.SendMessage(msg)
  18. err != nil の場合 {
  19. fmt.Println(エラー)
  20. 戻る 
  21. }
  22. fmt.Printf( "パーティション:%d オフセット:%d\n" , パーティション, オフセット)
  23. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewSyncProducer() を呼び出して新しい SyncProducer を作成します。 SendMessage() を呼び出すと、指定されたメッセージが生成され、生成が成功したか失敗したかの場合にのみ戻ります。生成されたメッセージのパーティションとオフセットを返します。メッセージの生成が失敗した場合はエラーを返します。

リークを回避するには、プロデューサーで Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに自動的にガベージ コレクションされない可能性があるためです。

03. 消費者

Sarama ライブラリの Consumer または ConsumerGroup API を使用してメッセージを消費できます。読者の理解を容易にするために、この記事では Consumer を使用してメッセージを消費する方法を紹介します。

Consumer は、ブローカーからの Kafka メッセージを処理する PartitionConsumers を管理します。

メッセージを消費するコンシューマーのサンプル コード:

  1. func consumer (brokenAddr []string, topic string, partition int32, offset int64) {
  2. 消費者、エラー:= sarama.NewConsumer(brokenAddr, nil)
  3. err != nil の場合 {
  4. fmt.Println(エラー)
  5. 戻る 
  6. }
  7. 遅延関数() {
  8. err = コンシューマーの場合。近い();エラー != ゼロ {
  9. fmt.Println(エラー)
  10. 戻る 
  11. }
  12. }()
  13. パーティションコンシューマー、エラー:= consumer.ConsumePartition(トピック、パーティション、オフセット)
  14. err != nil の場合 {
  15. fmt.Println(エラー)
  16. 戻る 
  17. }
  18. 遅延関数() {
  19. err = partitionConsumer の場合。近い();エラー != ゼロ {
  20. fmt.Println(エラー)
  21. 戻る 
  22. }
  23. }()
  24. msg := range の場合、partitionConsumer.Messages() {
  25. fmt.Printf( "パーティション:%d オフセット:%d キー:%s 値:%s\n" , msg.Partition, msg.Offset, msg.Key , msg.Value)
  26. }
  27. }

上記のコードを読むと、ブローカーのアドレスと構成情報を指定して、NewConsumer() を呼び出して新しいコンシューマーを作成します。トピック、パーティション、オフセットを指定して、ConsumePartition() を呼び出して PartitionConsumer を作成します。 PartitionConsumer は、指定されたトピックとパーティションからの Kafka メッセージを処理します。

リークを防ぐために、consumer とpartitionConsumer で Close() を呼び出す必要があることに注意することが重要です。スコープ外になったときに、自動的にガベージ コレクションされない可能性があるためです。

04. 結論

この記事では主に、Apache Kafka の Golang 言語クライアント ライブラリ Sarama を使用して Kafka メッセージを生成および消費する方法を紹介します。生産者と消費者の両方に対して簡単な例を示します。さらに、Sarama ライブラリは他の多くの API も提供します。興味のある読者は、公式ドキュメントを読んで詳細を確認してください。

<<:  クラウド移行を実施する前にコストを計算する方法

>>:  Tencent Qianfanと提携し、SalesEasy PaaSプラットフォームが企業のアプリケーションのカスタマイズを支援

推薦する

アマゾン ウェブ サービスが診断開発プログラムの新フェーズを開始し、資金提供範囲をさらに拡大

2021 年 4 月 13 日、Amazon Web Services は診断開発プログラムの新しい...

クラウドコンピューティングの失敗

[[389782]]最近の評価引き下げがなければ、IBMを除いてテクノロジー株を買うことより良い投資...

4月18日の百度の大型アップデートの簡単な分析

生理が来たからかどうかは分かりませんが、今回の百度アップデートで、特定の業界のウェブサイトランキング...

hostkvm: 生涯 25% オフ、香港 VPS + シンガポール VPS、トラフィック サポート

Hostkvm からメッセージが届きました: ウェブサイトが刷新され、新しい香港 VPS のトラフィ...

Kafka が高速である 6 つの理由

[[335450]]この記事はWeChatの公開アカウント「JavaKeeper」から転載したもので...

移植性と相互運用性: マルチクラウド成功の秘訣​

多くの企業は、ビジネス運営の効率、俊敏性、フォールト トレランスを向上させるために複数のクラウド プ...

百度は独創性に注意を払うべきだと言っている

百度の最近の動向は誰もが知っています。インターネットの将来において、百度は独創性にもっと注意を払うで...

JVM仮想マシンの全体構造とオブジェクトメモリ割り当ての分析

[[414275]] JVM仮想マシンの全体構造の分析全体構造の紹介jvm は次のように分かれていま...

Baidu はスナップショットの問題を疑い、多くのウェブサイトがロールバック

今日はとても不思議な日です。Baiduのスタッフのミスなのか、コンピューターの不具合なのかは分かりま...

raksmart: 日本のクラスター サーバー、50M 帯域幅、無制限トラフィック、月額 231 ドル、e3-1230/16g メモリ/1T ハード ディスク/258 IP

Raksmart クラスター サーバーはプロモーションを実施しています: 日本のクラスター サーバー...

AppleがApple Watchと新型MacBookを発売

3月10日の早朝、米国サンフランシスコのイエルバブエナ・センター・フォー・ジ・アーツでAppleの2...

「WeChatが世界を変える」の流行から学ぶ、企業のマーケティングのやり方

数日前、「WeChatマーケティングは思ったほど簡単ではない」というタイトルの記事を書き、WeCha...

itools: モンゴル VPS、モンゴル サーバー、無制限のトラフィック、月額 26 ドルからの支払い

モンゴルの VPS、モンゴルのクラウド サーバー: モンゴルの会社 itools.mn は 2011...

政府がオンライン世論を監視する際に直面する主な問題

近年、さまざまなタイプのネット世論事件が頻発しており、ネット世論活動は国家レベルの注目を集めています...