Kafka と MongoDB を使用した非同期処理

Kafka と MongoDB を使用した非同期処理

[[240575]]

前回のブログ投稿「初めての Go マイクロサービス: MongoDB と Docker を使用したマルチステージ ビルド」では、RESTful http エンドポイントを公開し、HTTP POST から受信したデータを MongoDB データベースに保存するサンプルの Go マイクロサービスを作成しました。

この例では、データ ストレージを MongoDB から分離し、それを処理するための別のマイクロサービスを作成しました。また、マイクロサービスが独自の懸念事項を非同期的に処理できるように、メッセージング レイヤーとして Kafka を追加しました。

このブログ投稿の全プロセスをこのビデオに録画しましたので、時間があればご覧ください :)

以下は、2 つのマイクロサービスを使用したこの単純な非同期処理の例の高レベル アーキテクチャ図です。

REST-Kafka-Mongo-マイクロサービス-Draw-IO

マイクロサービス 1 - /POST http 呼び出しからデータを受信する RESTful マイクロサービスです。リクエストを受信すると、http リクエストからデータを取得して Kafka に保存します。保存後、同じデータを /POST 経由で呼び出し元に送り返します。

マイクロサービス 2 - マイクロサービス 1 のデータが保存されている Kafka のトピックをサブスクライブするマイクロサービスです。メッセージがマイクロサービスによって消費されると、そのデータは MongoDB に保存されます。

先に進む前に、これらのマイクロサービスを実行するためにいくつかのものが必要です。

  1. Kafkaをダウンロード - 私はバージョンkafka_2.11-1.1.0を使用しました
  2. librdkafkaをインストールします - 残念ながら、このライブラリはターゲットシステム上に存在している必要があります
  3. Kafka Goクライアントをインストールする
  4. MongoDB を実行します。これについては、MongoDB Docker イメージを使用した以前の記事をお読みください。

さあ始めましょう!

まず、Kafka を起動します。 Kafka サーバーを実行する前に、Zookeeper を実行する必要があります。次に例を示します。

  1. $ cd /< download path >/ kafka_2 . 11 - 1.1 . 0
  2. $ bin / zookeeper - server - start . sh config / zookeeper . properties

次に、Kafka を実行します。Kafka への接続にはポート 9092 を使用します。ポートを変更する必要がある場合は、 config/server.propertiesで設定するだけです。私のような初心者の場合は、今のところデフォルトのポートを使用することをお勧めします。

  1. $ bin / kafka - server - start . sh config / server . properties

Kafka を実行したら、MongoDB が必要になります。とても簡単です。このdocker-compose.ymlを使用するだけです。

  1. version : '3'
  2. services :
  3. mongodb :
  4. image : mongo
  5. ports :
  6. - "27017:27017"
  7. volumes :
  8. - "mongodata:/data/db"
  9. networks :
  10. - network1
  11. volumes :
  12. mongodata :
  13. networks :
  14. network1 :

Docker Compose を使用して MongoDB Docker コンテナを実行します。

  1. docker - compose up

以下はマイクロサービス 1 に関連するコードです。以前の例を変更して、MongoDB ではなく Kafka に保存するようにしました。

レストからカフカへ/レストカフカサンプル.go

  1. func jobsPostHandler ( w http . ResponseWriter , r * http . Request ) {
  2. //Retrieve body from http request
  3. b , err := ioutil . ReadAll ( r . Body )
  4. defer r . Body . Close ()
  5. if err != nil {
  6. panic ( err )
  7. }
  8. //Save data into Job struct
  9. var _job Job
  10. err = json . Unmarshal ( b , & _job )
  11. if err != nil {
  12. http . Error ( w , err . Error (), 500 )
  13. return
  14. }
  15. saveJobToKafka ( _job )
  16. //Convert job struct into json
  17. jsonString , err := json . Marshal ( _job )
  18. if err != nil {
  19. http . Error ( w , err . Error (), 500 )
  20. return
  21. }
  22. //Set content-type http header
  23. w . Header (). Set ( "content-type" , "application/json" )
  24. //Send back data as response
  25. w . Write ( jsonString )
  26. }
  27. func saveJobToKafka ( job Job ) {
  28. fmt . Println ( "save to kafka" )
  29. jsonString , err := json . Marshal ( job )
  30. jobString := string ( jsonString )
  31. fmt . Print ( jobString )
  32. p , err := kafka . NewProducer (& kafka . ConfigMap { "bootstrap.servers" : "localhost:9092" })
  33. if err != nil {
  34. panic ( err )
  35. }
  36. // Produce messages to topic (asynchronously)
  37. topic := "jobs-topic1"
  38. for _ , word := range [] string { string ( jobString )} {
  39. p . Produce (& kafka . Message {
  40. TopicPartition : kafka . TopicPartition { Topic : & topic , Partition : kafka . PartitionAny },
  41. Value : [] byte ( word ),
  42. }, nil )
  43. }
  44. }

以下はマイクロサービス 2 のコードです。このコードで最も重要なことは、Kafka からデータを消費することです。保存部分については、前回のブログ投稿ですでに説明しました。ここでのコードの重要な部分は、Kafka からのデータの使用です。

kafka-to-mongo/kafka-mongo-sample.go

  1. func main () {
  2. //Create MongoDB session
  3. session := initialiseMongo ()
  4. mongoStore . session = session
  5. receiveFromKafka ()
  6. }
  7. func receiveFromKafka () {
  8. fmt . Println ( "Start receiving from Kafka" )
  9. c , err := kafka . NewConsumer (& kafka . ConfigMap {
  10. "bootstrap.servers" : "localhost:9092" ,
  11. "group.id" : "group-id-1" ,
  12. "auto.offset.reset" : "earliest" ,
  13. })
  14. if err != nil {
  15. panic ( err )
  16. }
  17. c . SubscribeTopics ([] string { "jobs-topic1" }, nil )
  18. for {
  19. msg , err := c . ReadMessage (- 1 )
  20. if err == nil {
  21. fmt . Printf ( "Received from Kafka %s: %s\n" , msg . TopicPartition , string ( msg . Value ))
  22. job := string ( msg . Value )
  23. saveJobToMongo ( job )
  24. } else {
  25. fmt . Printf ( "Consumer error: %v (%v)\n" , err , msg )
  26. break
  27. }
  28. }
  29. c . Close ()
  30. }
  31. func saveJobToMongo ( jobString string ) {
  32. fmt . Println ( "Save to MongoDB" )
  33. col := mongoStore . session . DB ( database ). C ( collection )
  34. //Save data into Job struct
  35. var _job Job
  36. b := [] byte ( jobString )
  37. err := json . Unmarshal ( b , & _job )
  38. if err != nil {
  39. panic ( err )
  40. }
  41. //Insert job into MongoDB
  42. errMongo := col . Insert ( _job )
  43. if errMongo != nil {
  44. panic ( errMongo )
  45. }
  46. fmt . Printf ( "Saved to MongoDB : %s" , jobString )
  47. }

マイクロサービス 1 のデモを行って実行してみましょう。Kafka が実行されていることを確認します。

  1. $ go run rest - kafka - sample . go

Postman を使用して、Microservice 1 にデータを送信します。

スクリーンショット-2018-04-29-22.20.33

以下はマイクロサービス 1 で確認できるログです。これが表示される場合、Postman から送信されたデータが受信され、Kafka に保存されたことを意味します。

スクリーンショット-2018-04-29-22.22.00

マイクロサービス 2 はまだ実行していないため、データはマイクロサービス 1 によって Kafka にのみ保存されます。マイクロサービス 2 を実行して、そのデータを消費し、MongoDB に保存しましょう。

  1. $ go run kafka - mongo - sample . go

これで、Microservice 2 で消費されたデータが表示され、MongoDB に保存されます。

スクリーンショット-2018-04-29-22.24.15

データが MongoDB に保存されているかどうかを確認します。データがあれば成功です!

スクリーンショット-2018-04-29-22.26.39

完全なソースコードはここにあります:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

<<:  IDC: Inspur Cloud は 2018 年第 1 四半期の中国のパブリック クラウド IaaS ベンダーの中で成長率第 1 位にランクされました

>>:  オラクル、自律型データベースクラウドサービスとPaaSの組み合わせを開始

推薦する

山東東営あなたのコース製品は競争力、Facebookグループ制御マーケティング完全なネットワークレイアウト

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っていますFaceb...

Google 広告のクリック数が足りませんか?広告最適化テクニックについて学ぶ

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています近年、ます...

RBAC を使用して Kubernetes リソースへのアクセスを制限する

この記事では、Kubernetes RBAC 認証モデルを最初から再作成する方法と、Roles、Cl...

Dangdang.com はなぜ成長できなかったのでしょうか?

はじめに:なぜ当当は大きく成長できないのか?それは上場のタイミングの問題であり、事業レイアウトの問題...

Weiboマーケティング:間違って使っているマーケティングツール

私たちはモバイルインターネットが普及した時代に生きています。私たちの生活の中の情報はあらゆる方向でつ...

「オープンソース」によって作成され、「Haiyun Jiexun」によって運営されています

猛暑の6月でも、オープンソースへの熱意は衰えるどこ​​ろか、高まっています。 「2018 Openi...

ウェブサイトのキーワードランキングSEO最適化は段階的に行う必要がある

最新の映画サイトを立ち上げてからまだ 2 か月も経っていませんが、最適化のテクニックと経験を皆さんと...

Baidu入札とSEOの比較についての簡単な説明

百度が現在、中国のインターネット市場における検索エンジンシェアの 85% 以上を占めていることは周知...

sentris-VPS/3 年 5 ドル | 年額 25 ドル、4 IP、2g メモリ、KVM VPS | シアトル データ センター

Sentris は、シアトル データ センターの KVM 仮想 VPS という新しいプロモーションを...

地方病院のウェブサイトは3つの問題に注意する必要がある

インターネット時代がますます発展するにつれて、各地の病院のウェブサイトも発展しており、市場の見通しは...

4月のオンラインバンキング報道動向:Alipayが優勢、CCBがトップ

IDC Review Network (idcps.com) は 5 月 29 日に次のように報告し...

分析例: ウェブサイトがそもそも存在しないという事実は、そのサイトの権威が低下することを意味しますか?

今朝目覚めると、北京SEOブログのホームページが1位ではなく、ランキングも下がっていました。私はこれ...

Ucomm のマーケティングの約束は真実でしょうか、それとも嘘でしょうか?

2018年最もホットなプロジェクト:テレマーケティングロボットがあなたの参加を待っています現在、多く...

「クラウドアプリケーション」が経済回復を加速させるが、クラウド化への道のりには課題が残る

「クラウド教室」は授業が中断されても学生が学習を継続できるようにし、「クラウドオフィス」は職場の人々...

複数サイトのホームページが1位にならない理由

最近、自分のウェブサイトの外部リンクを分析していたところ、検査結果に「ホームページが最初ではない」と...