Kafka と MongoDB を使用した非同期処理

Kafka と MongoDB を使用した非同期処理

[[240575]]

前回のブログ投稿「初めての Go マイクロサービス: MongoDB と Docker を使用したマルチステージ ビルド」では、RESTful http エンドポイントを公開し、HTTP POST から受信したデータを MongoDB データベースに保存するサンプルの Go マイクロサービスを作成しました。

この例では、データ ストレージを MongoDB から分離し、それを処理するための別のマイクロサービスを作成しました。また、マイクロサービスが独自の懸念事項を非同期的に処理できるように、メッセージング レイヤーとして Kafka を追加しました。

このブログ投稿の全プロセスをこのビデオに録画しましたので、時間があればご覧ください :)

以下は、2 つのマイクロサービスを使用したこの単純な非同期処理の例の高レベル アーキテクチャ図です。

REST-Kafka-Mongo-マイクロサービス-Draw-IO

マイクロサービス 1 - /POST http 呼び出しからデータを受信する RESTful マイクロサービスです。リクエストを受信すると、http リクエストからデータを取得して Kafka に保存します。保存後、同じデータを /POST 経由で呼び出し元に送り返します。

マイクロサービス 2 - マイクロサービス 1 のデータが保存されている Kafka のトピックをサブスクライブするマイクロサービスです。メッセージがマイクロサービスによって消費されると、そのデータは MongoDB に保存されます。

先に進む前に、これらのマイクロサービスを実行するためにいくつかのものが必要です。

  1. Kafkaをダウンロード - 私はバージョンkafka_2.11-1.1.0を使用しました
  2. librdkafkaをインストールします - 残念ながら、このライブラリはターゲットシステム上に存在している必要があります
  3. Kafka Goクライアントをインストールする
  4. MongoDB を実行します。これについては、MongoDB Docker イメージを使用した以前の記事をお読みください。

さあ始めましょう!

まず、Kafka を起動します。 Kafka サーバーを実行する前に、Zookeeper を実行する必要があります。次に例を示します。

  1. $ cd /< download path >/ kafka_2 . 11 - 1.1 . 0
  2. $ bin / zookeeper - server - start . sh config / zookeeper . properties

次に、Kafka を実行します。Kafka への接続にはポート 9092 を使用します。ポートを変更する必要がある場合は、 config/server.propertiesで設定するだけです。私のような初心者の場合は、今のところデフォルトのポートを使用することをお勧めします。

  1. $ bin / kafka - server - start . sh config / server . properties

Kafka を実行したら、MongoDB が必要になります。とても簡単です。このdocker-compose.ymlを使用するだけです。

  1. version : '3'
  2. services :
  3. mongodb :
  4. image : mongo
  5. ports :
  6. - "27017:27017"
  7. volumes :
  8. - "mongodata:/data/db"
  9. networks :
  10. - network1
  11. volumes :
  12. mongodata :
  13. networks :
  14. network1 :

Docker Compose を使用して MongoDB Docker コンテナを実行します。

  1. docker - compose up

以下はマイクロサービス 1 に関連するコードです。以前の例を変更して、MongoDB ではなく Kafka に保存するようにしました。

レストからカフカへ/レストカフカサンプル.go

  1. func jobsPostHandler ( w http . ResponseWriter , r * http . Request ) {
  2. //Retrieve body from http request
  3. b , err := ioutil . ReadAll ( r . Body )
  4. defer r . Body . Close ()
  5. if err != nil {
  6. panic ( err )
  7. }
  8. //Save data into Job struct
  9. var _job Job
  10. err = json . Unmarshal ( b , & _job )
  11. if err != nil {
  12. http . Error ( w , err . Error (), 500 )
  13. return
  14. }
  15. saveJobToKafka ( _job )
  16. //Convert job struct into json
  17. jsonString , err := json . Marshal ( _job )
  18. if err != nil {
  19. http . Error ( w , err . Error (), 500 )
  20. return
  21. }
  22. //Set content-type http header
  23. w . Header (). Set ( "content-type" , "application/json" )
  24. //Send back data as response
  25. w . Write ( jsonString )
  26. }
  27. func saveJobToKafka ( job Job ) {
  28. fmt . Println ( "save to kafka" )
  29. jsonString , err := json . Marshal ( job )
  30. jobString := string ( jsonString )
  31. fmt . Print ( jobString )
  32. p , err := kafka . NewProducer (& kafka . ConfigMap { "bootstrap.servers" : "localhost:9092" })
  33. if err != nil {
  34. panic ( err )
  35. }
  36. // Produce messages to topic (asynchronously)
  37. topic := "jobs-topic1"
  38. for _ , word := range [] string { string ( jobString )} {
  39. p . Produce (& kafka . Message {
  40. TopicPartition : kafka . TopicPartition { Topic : & topic , Partition : kafka . PartitionAny },
  41. Value : [] byte ( word ),
  42. }, nil )
  43. }
  44. }

以下はマイクロサービス 2 のコードです。このコードで最も重要なことは、Kafka からデータを消費することです。保存部分については、前回のブログ投稿ですでに説明しました。ここでのコードの重要な部分は、Kafka からのデータの使用です。

kafka-to-mongo/kafka-mongo-sample.go

  1. func main () {
  2. //Create MongoDB session
  3. session := initialiseMongo ()
  4. mongoStore . session = session
  5. receiveFromKafka ()
  6. }
  7. func receiveFromKafka () {
  8. fmt . Println ( "Start receiving from Kafka" )
  9. c , err := kafka . NewConsumer (& kafka . ConfigMap {
  10. "bootstrap.servers" : "localhost:9092" ,
  11. "group.id" : "group-id-1" ,
  12. "auto.offset.reset" : "earliest" ,
  13. })
  14. if err != nil {
  15. panic ( err )
  16. }
  17. c . SubscribeTopics ([] string { "jobs-topic1" }, nil )
  18. for {
  19. msg , err := c . ReadMessage (- 1 )
  20. if err == nil {
  21. fmt . Printf ( "Received from Kafka %s: %s\n" , msg . TopicPartition , string ( msg . Value ))
  22. job := string ( msg . Value )
  23. saveJobToMongo ( job )
  24. } else {
  25. fmt . Printf ( "Consumer error: %v (%v)\n" , err , msg )
  26. break
  27. }
  28. }
  29. c . Close ()
  30. }
  31. func saveJobToMongo ( jobString string ) {
  32. fmt . Println ( "Save to MongoDB" )
  33. col := mongoStore . session . DB ( database ). C ( collection )
  34. //Save data into Job struct
  35. var _job Job
  36. b := [] byte ( jobString )
  37. err := json . Unmarshal ( b , & _job )
  38. if err != nil {
  39. panic ( err )
  40. }
  41. //Insert job into MongoDB
  42. errMongo := col . Insert ( _job )
  43. if errMongo != nil {
  44. panic ( errMongo )
  45. }
  46. fmt . Printf ( "Saved to MongoDB : %s" , jobString )
  47. }

マイクロサービス 1 のデモを行って実行してみましょう。Kafka が実行されていることを確認します。

  1. $ go run rest - kafka - sample . go

Postman を使用して、Microservice 1 にデータを送信します。

スクリーンショット-2018-04-29-22.20.33

以下はマイクロサービス 1 で確認できるログです。これが表示される場合、Postman から送信されたデータが受信され、Kafka に保存されたことを意味します。

スクリーンショット-2018-04-29-22.22.00

マイクロサービス 2 はまだ実行していないため、データはマイクロサービス 1 によって Kafka にのみ保存されます。マイクロサービス 2 を実行して、そのデータを消費し、MongoDB に保存しましょう。

  1. $ go run kafka - mongo - sample . go

これで、Microservice 2 で消費されたデータが表示され、MongoDB に保存されます。

スクリーンショット-2018-04-29-22.24.15

データが MongoDB に保存されているかどうかを確認します。データがあれば成功です!

スクリーンショット-2018-04-29-22.26.39

完全なソースコードはここにあります:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

<<:  IDC: Inspur Cloud は 2018 年第 1 四半期の中国のパブリック クラウド IaaS ベンダーの中で成長率第 1 位にランクされました

>>:  オラクル、自律型データベースクラウドサービスとPaaSの組み合わせを開始

推薦する

2013年7月の振り返りとまとめ

8 月初旬にいくつかの変化が起こり、私はこれまでの考察や要約を振り返ることになりました。私の心はまだ...

クラウド コンピューティング市場の爆発的な成長の中で、3 種類のクラウド サービスのバランスをどのように取るのでしょうか?

中国情報通信研究院が発表した2021年版「クラウドコンピューティング白書」によると、IaaSサービス...

Baidu の重みをチェックする場合、Aizhan と Webmaster Tools のどちらがより正確ですか?

本題に入る前に、少しお話しさせてください。最近とても忙しかったので、1 か月近く A5 に来て友人と...

ロサンゼルスのデータセンターにあるshockhostingのVPSの簡単なレビューでshockhostingの仕組みを説明します

shockhosting は広告もほとんどなく、非常に控えめな中小規模のサーバー業者ですが、製品の品...

alpharacks-$6.3/6g メモリ/160g ハードディスク/5T トラフィック/2IP/G ポート/ロサンゼルス

いじくり回すのに適した VPS をお勧めしたいと思います。alpharacks のこの OVZ は ...

Kaopu CloudがシリーズA資金調達を完了し、ハイブリッドクラウドの導入を加速させる

最近、国内のハイブリッドクラウドサービスプロバイダーであるKaopu Cloud(871182.OC...

ASO でアプリストアのランキングを向上させる 8 つのヒント

資金不足に悩む開発者にとって、ゲームを市場に出す上での露出は大きな欠点です。では、App Store...

Baidu アルゴリズムのパスと包含に関する問題

主に、Baidu アルゴリズムにおけるウェブサイトのパス、インクルード、パスとインクルードの問題につ...

国内化粧品ブランドのソーシャルメディアマーケティング事例

近年、中国の伝統的なスタイルやトレンドが非常に人気となり、新しく最先端の化粧品ブランドが急速に台頭し...

Baidu スナップショットロールバックの 6 つの主な原因の分析と対策

Baidu スナップショット ロールバックとは、Baidu スナップショットが前の日付に戻ることを意...

databasebydesignllc-$3.33/KVM/1g メモリ/30g ハードディスク/2T トラフィック/G ポート

databasebydesignllc は 2002 年に設立され、10 年以上の歴史があると主張し...

クラウド アプリケーション開発の効率化のための 5 つのヒント

クラウド テクノロジーが IT 業界を席巻している今日、クラウド コンピューティングの出現後に会社が...

ブログ開発はどんな困難に直面しますか?ボトルネック期間を突破するには?

インターネット上では、Zac の個人ブログ「SEO Post Every Day」、Lu Songs...

企業がWeiboを活用してブランド認知度を高める方法について話す

微博が人々の生活に欠かせないものとなり、いつでもどこでも自分の考えや写真を微博にアップロードすること...

URL 送信チャネル ツールは記事の掲載を促進できますか?

最近のBaiduのアルゴリズムの継続的な改善により、多くのウェブサイトが破壊されました。ほとんどのウ...