Kafka と MongoDB を使用した非同期処理

Kafka と MongoDB を使用した非同期処理

[[240575]]

前回のブログ投稿「初めての Go マイクロサービス: MongoDB と Docker を使用したマルチステージ ビルド」では、RESTful http エンドポイントを公開し、HTTP POST から受信したデータを MongoDB データベースに保存するサンプルの Go マイクロサービスを作成しました。

この例では、データ ストレージを MongoDB から分離し、それを処理するための別のマイクロサービスを作成しました。また、マイクロサービスが独自の懸念事項を非同期的に処理できるように、メッセージング レイヤーとして Kafka を追加しました。

このブログ投稿の全プロセスをこのビデオに録画しましたので、時間があればご覧ください :)

以下は、2 つのマイクロサービスを使用したこの単純な非同期処理の例の高レベル アーキテクチャ図です。

REST-Kafka-Mongo-マイクロサービス-Draw-IO

マイクロサービス 1 - /POST http 呼び出しからデータを受信する RESTful マイクロサービスです。リクエストを受信すると、http リクエストからデータを取得して Kafka に保存します。保存後、同じデータを /POST 経由で呼び出し元に送り返します。

マイクロサービス 2 - マイクロサービス 1 のデータが保存されている Kafka のトピックをサブスクライブするマイクロサービスです。メッセージがマイクロサービスによって消費されると、そのデータは MongoDB に保存されます。

先に進む前に、これらのマイクロサービスを実行するためにいくつかのものが必要です。

  1. Kafkaをダウンロード - 私はバージョンkafka_2.11-1.1.0を使用しました
  2. librdkafkaをインストールします - 残念ながら、このライブラリはターゲットシステム上に存在している必要があります
  3. Kafka Goクライアントをインストールする
  4. MongoDB を実行します。これについては、MongoDB Docker イメージを使用した以前の記事をお読みください。

さあ始めましょう!

まず、Kafka を起動します。 Kafka サーバーを実行する前に、Zookeeper を実行する必要があります。次に例を示します。

  1. $ cd /< download path >/ kafka_2 . 11 - 1.1 . 0
  2. $ bin / zookeeper - server - start . sh config / zookeeper . properties

次に、Kafka を実行します。Kafka への接続にはポート 9092 を使用します。ポートを変更する必要がある場合は、 config/server.propertiesで設定するだけです。私のような初心者の場合は、今のところデフォルトのポートを使用することをお勧めします。

  1. $ bin / kafka - server - start . sh config / server . properties

Kafka を実行したら、MongoDB が必要になります。とても簡単です。このdocker-compose.ymlを使用するだけです。

  1. version : '3'
  2. services :
  3. mongodb :
  4. image : mongo
  5. ports :
  6. - "27017:27017"
  7. volumes :
  8. - "mongodata:/data/db"
  9. networks :
  10. - network1
  11. volumes :
  12. mongodata :
  13. networks :
  14. network1 :

Docker Compose を使用して MongoDB Docker コンテナを実行します。

  1. docker - compose up

以下はマイクロサービス 1 に関連するコードです。以前の例を変更して、MongoDB ではなく Kafka に保存するようにしました。

レストからカフカへ/レストカフカサンプル.go

  1. func jobsPostHandler ( w http . ResponseWriter , r * http . Request ) {
  2. //Retrieve body from http request
  3. b , err := ioutil . ReadAll ( r . Body )
  4. defer r . Body . Close ()
  5. if err != nil {
  6. panic ( err )
  7. }
  8. //Save data into Job struct
  9. var _job Job
  10. err = json . Unmarshal ( b , & _job )
  11. if err != nil {
  12. http . Error ( w , err . Error (), 500 )
  13. return
  14. }
  15. saveJobToKafka ( _job )
  16. //Convert job struct into json
  17. jsonString , err := json . Marshal ( _job )
  18. if err != nil {
  19. http . Error ( w , err . Error (), 500 )
  20. return
  21. }
  22. //Set content-type http header
  23. w . Header (). Set ( "content-type" , "application/json" )
  24. //Send back data as response
  25. w . Write ( jsonString )
  26. }
  27. func saveJobToKafka ( job Job ) {
  28. fmt . Println ( "save to kafka" )
  29. jsonString , err := json . Marshal ( job )
  30. jobString := string ( jsonString )
  31. fmt . Print ( jobString )
  32. p , err := kafka . NewProducer (& kafka . ConfigMap { "bootstrap.servers" : "localhost:9092" })
  33. if err != nil {
  34. panic ( err )
  35. }
  36. // Produce messages to topic (asynchronously)
  37. topic := "jobs-topic1"
  38. for _ , word := range [] string { string ( jobString )} {
  39. p . Produce (& kafka . Message {
  40. TopicPartition : kafka . TopicPartition { Topic : & topic , Partition : kafka . PartitionAny },
  41. Value : [] byte ( word ),
  42. }, nil )
  43. }
  44. }

以下はマイクロサービス 2 のコードです。このコードで最も重要なことは、Kafka からデータを消費することです。保存部分については、前回のブログ投稿ですでに説明しました。ここでのコードの重要な部分は、Kafka からのデータの使用です。

kafka-to-mongo/kafka-mongo-sample.go

  1. func main () {
  2. //Create MongoDB session
  3. session := initialiseMongo ()
  4. mongoStore . session = session
  5. receiveFromKafka ()
  6. }
  7. func receiveFromKafka () {
  8. fmt . Println ( "Start receiving from Kafka" )
  9. c , err := kafka . NewConsumer (& kafka . ConfigMap {
  10. "bootstrap.servers" : "localhost:9092" ,
  11. "group.id" : "group-id-1" ,
  12. "auto.offset.reset" : "earliest" ,
  13. })
  14. if err != nil {
  15. panic ( err )
  16. }
  17. c . SubscribeTopics ([] string { "jobs-topic1" }, nil )
  18. for {
  19. msg , err := c . ReadMessage (- 1 )
  20. if err == nil {
  21. fmt . Printf ( "Received from Kafka %s: %s\n" , msg . TopicPartition , string ( msg . Value ))
  22. job := string ( msg . Value )
  23. saveJobToMongo ( job )
  24. } else {
  25. fmt . Printf ( "Consumer error: %v (%v)\n" , err , msg )
  26. break
  27. }
  28. }
  29. c . Close ()
  30. }
  31. func saveJobToMongo ( jobString string ) {
  32. fmt . Println ( "Save to MongoDB" )
  33. col := mongoStore . session . DB ( database ). C ( collection )
  34. //Save data into Job struct
  35. var _job Job
  36. b := [] byte ( jobString )
  37. err := json . Unmarshal ( b , & _job )
  38. if err != nil {
  39. panic ( err )
  40. }
  41. //Insert job into MongoDB
  42. errMongo := col . Insert ( _job )
  43. if errMongo != nil {
  44. panic ( errMongo )
  45. }
  46. fmt . Printf ( "Saved to MongoDB : %s" , jobString )
  47. }

マイクロサービス 1 のデモを行って実行してみましょう。Kafka が実行されていることを確認します。

  1. $ go run rest - kafka - sample . go

Postman を使用して、Microservice 1 にデータを送信します。

スクリーンショット-2018-04-29-22.20.33

以下はマイクロサービス 1 で確認できるログです。これが表示される場合、Postman から送信されたデータが受信され、Kafka に保存されたことを意味します。

スクリーンショット-2018-04-29-22.22.00

マイクロサービス 2 はまだ実行していないため、データはマイクロサービス 1 によって Kafka にのみ保存されます。マイクロサービス 2 を実行して、そのデータを消費し、MongoDB に保存しましょう。

  1. $ go run kafka - mongo - sample . go

これで、Microservice 2 で消費されたデータが表示され、MongoDB に保存されます。

スクリーンショット-2018-04-29-22.24.15

データが MongoDB に保存されているかどうかを確認します。データがあれば成功です!

スクリーンショット-2018-04-29-22.26.39

完全なソースコードはここにあります:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

<<:  IDC: Inspur Cloud は 2018 年第 1 四半期の中国のパブリック クラウド IaaS ベンダーの中で成長率第 1 位にランクされました

>>:  オラクル、自律型データベースクラウドサービスとPaaSの組み合わせを開始

推薦する

ウェブサイトの商業化は合理的でなければなりません。純粋な商業化はウェブサイト開発に役立ちません(パート 2)

数日前、私は「ウェブサイトの商業化は合理的でなければならない、純粋な商業化はウェブサイトの発展に悪影...

ウェブマスターネットワークからの毎日のレポート:EUがグーグルへの課税を引き上げ、ヤフーは27億ドルの損害賠償を支払うよう命じられた

1. EUはグーグルなどの米国のインターネット大手への課税を強化する計画海外メディアの報道によると、...

FrontRangeHosting-768m メモリ KVM/44g ハードディスク/1500g フロー/6 USD

最近、frontrangehostingはONAPPを立ち上げ、新しい請求管理システムhostbil...

タオバオライブはなぜ

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています基本的に、...

Z-Blogの使用経験

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています以前、私は...

創業者の解釈: Akamai のクラウド コンピューティング戦略の将来

著者: トム・レイトン博士 1998 年に Akamai Technologies を共同設立し、2...

ウェブサイトを「改善し続ける」ための複数のアプローチ

筆者はこれまで5~6のウェブサイトを運営してきました。最近、自分が手がけたいくつかのウェブサイトを分...

#11.11# メガレイヤー: 最低 159 元/年、フィリピン VPS\香港 VPS\米国 VPS、20M 帯域幅、無制限トラフィック

メガレイヤーは、11 月 30 日まで、今年の Double Eleven VPS プロモーションを...

Blue Bamboo Cloud: 新年の VPS プロモーション、香港 VPS (cn2+cmi) + 米国 VPS (AS4837)、月額 10 元から

Blue Bamboo Cloudは、新年のVPSプロモーションプランを発表しました。今回のプロモー...

quadix-$38/L5630/32g メモリ/250g SSD/30t トラフィック

quadix には非常に安価なサーバーが 2 つありますので、ご紹介します。心配しないでください。q...

underhost: 苦情防止 VPS、著作権無視、無料 cpanel + Windows ライセンス、香港/ロシア/オランダのデータセンター

もう一度、アンダーホストについてお話しましょう。この会社は 2007 年から運営されています。同社の...

「Tencent Game Manager」を例に、ユーザーの成長についてお話ししましょう。

1. 市場背景市場の状況を考え、分析することで、自社製品の意義や価値を検証し、開発に適した成長ポイン...

UEO はウェブサイトのキーワードランキングの未来でしょうか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますまず、タイ...

「ビッグバン・セオリー」などのアメリカのテレビシリーズが棚から撤去された

新浪科技によると、「ビッグ・リボウスキ」や「グッド・ワイフ」など、いくつかのアメリカのテレビシリーズ...

Hostshark: 月額 4.95 ドル、米国 VPS、2G メモリ/2 コア/80g NVMe/1Gbps 帯域幅 (トラフィック無制限)/Windows+Linux

Hostshark.ioは今年設立された新しい企業で、主に米国で仮想ホスティング、VPS、独立サーバ...