Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

[[413839]]

[51CTO.com クイック翻訳] Kafka Connect は、特に強力なオープンソースのデータ ストリーミング ツールです。これにより、Kafka を他のデータ テクノロジーと簡単に組み合わせることができます。分散テクノロジーである Kafka Connect は、Kafka クラスターから独立した、非常に高い可用性と弾力的なスケーリングを提供します。 Kafka Connect は、ソース コネクタまたはシンク コネクタを使用して Kafka トピックとの間でデータを送受信し、さまざまな非 Kafka テクノロジーとのコード不要の統合を可能にします。

図1

多くの一般的なデータ テクノロジーに対応した堅牢なオープン ソース Kafka コネクタが用意されており、独自のコネクタを作成することもできます。この記事では、Kafka Connect を使用して、Kafka からのリアルタイム ストリーミング データを Elasticsearch (インデックス付けされた Kafka レコードのスケーラブルな検索を可能にする) および Kibana (その結果を視覚化する) と統合する実際のデータの使用例について説明します。

図2

Kafka と Kafka Connect の利点を示すユースケースとして、CDC COVID-19 データ トラッカーからヒントを得ました。 Kafka ベースのトラッカーは、複数の場所、複数の形式、複数のプロトコルからリアルタイムの COVID-19 検査データを収集し、これらのイベントを使いやすい視覚化に処理します。トラッカーには、結果が迅速に届き、信頼できるものであることを保証するために必要なデータ ガバナンス メカニズムも導入されています。

私は、同様に複雑で説得力のある、しかし理想的にはコロナウイルスのパンデミックほど心配の少ないユースケースを探し始めました。最終的に、私は興味深いドメインを見つけました。Moontide には、公開されているストリーミング REST API と、シンプルな JSON 形式の豊富なデータが含まれています。

月の潮汐データ

潮汐は月の一日に従います。月の一日は 24 時間 50 分の周期で、その間に地球は軌道を周回する衛星の下の同じ地点まで完全に自転します。月の重力により、毎月 2 回、満潮と干潮が起こります。

図3. アメリカ海洋大気庁より

アメリカ海洋大気庁 (NOAA) は、世界中の潮位観測所から詳細なセンサー データを簡単に取得できる REST API を提供しています。

図4

たとえば、次の REST 呼び出しでは、潮位観測所 ID、データ タイプ (海面を選択)、およびデータ (平均海面) を指定し、最新の結果をメートル単位で要求します。

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

この呼び出しは、潮位観測所の緯度と経度、時間、水位値を含む JSON 結果を返します。返される結果のデータ型、量、単位がわかるように、何を呼び出したかを覚えておく必要があることに注意してください。

  1. { "メタデータ" : {
  2. "id" : "8724580"
  3. 「名前」 : 「キーウェスト」
  4. "緯度" :"24.5508",
  5. "経度" : "-81.8081" },
  6. "データ" :[{
  7. "t" : "2020-09-24 04:18" ,
  8. "v" : "0.597" ,
  9. "s" : "0.005" "f" : "1,0,0,0" "q" : "p" }]}

データ パイプラインを開始する (REST ソース コネクタを使用)

Kafka Connect ストリーミング データ パイプラインの作成を開始するには、まず Kafka クラスターと Kafka Connect クラスターを準備する必要があります。

図5

次に、オープンソースで入手可能な REST コネクタをインポートします。これを AWS S3 バケットにデプロイします (必要に応じて、これらの手順に従ってください)。次に、Kafka Connect クラスターに S3 バケットを使用するように要求し、クラスター内で表示されるように同期し、コネクタを構成して、最後に実行します。この「BYOC」(Bring Your Own Connector)アプローチにより、特定の要件を満たすコネクタを見つける方法が無数に確保されます。

図6

次の例は、「curl」コマンドを使用して REST API を使用するように完全にオープン ソースの Kafka Connect デプロイメントを構成する方法を示しています。独自の展開に合わせて URL、名前、パスワードを変更する必要があることに注意してください。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「source_rest_tide_1」
  4. 「設定」 :{
  5. "key.converter" : "org.apache.kafka.connect.storage.StringConverter"
  6. "value.converter" : "org.apache.kafka.connect.storage.StringConverter"
  7. "コネクタ.クラス" : "com.tm.kafka.connect.rest.RestSourceConnector"
  8. "タスク.max" : "1" ,
  9. "rest.source.poll.interval.ms" : "600000" ,
  10. "rest.source.method" : "GET"
  11. "rest.source.url" : "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json"
  12. "rest.source.headers" : "コンテンツタイプ:application/json、Accept:application/json"
  13. "rest.source.topic.selector" : "com.tm.kafka.connect.rest.selector.SimpleTopicSelector"
  14. "rest.source.destination.topics" : "潮汐トピック"  
  15. }
  16. }

このコードによって作成されたコネクタ タスクは、10 分間隔で REST API をポーリングし、その結果を「tides-topic」Kafka トピックに書き込みます。このように 5 つの潮汐センサーをランダムに選択してデータを収集すると、5 つの構成と 5 つの接続を通じて潮汐データが潮汐テーマに入力されるようになります。

図7

パイプラインを終了する(Elasticsearch シンクコネクタを使用)

この Tidal データをどこかに保存するには、パイプラインの最後に Elasticsearch クラスターと Kibana を導入します。 Elasticsearch にデータを送信するために、オープンソースの Elasticsearch シンク コネクタを構成します。

図8

次の構成例では、シンク名、クラス、Elasticsearch インデックス、および Kafka トピックを使用します。インデックスがまだ存在しない場合は、デフォルトのマッピングを使用してインデックスが作成されます。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「弾性沈下潮」
  4. 「設定」 :
  5. {
  6. "コネクタ.クラス" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector"
  7. "タスク.max" : 3,
  8. 「トピック」 「潮汐」
  9. "connect.elastic.hosts" : "ip",
  10. "connect.elastic.port" : 9201,
  11. "connect.elastic.kcql" : "tides-indexに挿入し、tides-topicから*を選択します"
  12. "connect.elastic.use.http.username" : "elasticName",
  13. "connect.elastic.use.http.password" : "elasticPassword"
  14. }
  15. }'

パイプラインは現在稼働中です。ただし、デフォルトのインデックス マッピングにより、Tides インデックスに入力されるすべての潮汐データは文字列になります。

図9

時系列データを正確にプロットするには、カスタム マッピングが必要です。以下の Tidal インデックスのカスタム マッピングを作成します。カスタム日付には JSON の「t」フィールド、倍精度数値には「v」、集計を表すキーワードには「name」を使用します。

  1. curl -u elasticName:elasticPassword "elasticURL:9201/tides- index " -X PUT -H 'Content-Type: application/json' -d'
  2. {
  3. 「マッピング」 : {
  4. 「プロパティ」 : {
  5. "データ" : {
  6. 「プロパティ」 : {
  7. "t" : { "type" : "date" ,
  8. 「フォーマット」 : 「yyyy-MM-dd HH:mm」  
  9. },
  10. "v" : { "type" : "double" },
  11. "f" : { "type" : "text" },
  12. "q" : { "type" : "text" },
  13. "s" : { "タイプ" : "テキスト" }
  14. }
  15. },
  16. 「メタデータ」 : {
  17. 「プロパティ」 : {
  18. 「id」 : { 「タイプ」 : 「テキスト」 },
  19. "lat" : { "type" : "text" },
  20. "long" : { "type" : "text" },
  21. 「名前」 : { 「タイプ」 : 「キーワード」 } }}}} }'

Elasticsearch インデックス マッピングを変更するたびに、通常は Elasticsearch の「再インデックス」 (インデックスを削除し、すべてのデータの再インデックス) を行う必要があります。データは、このユースケースのように既存の Kafka シンク コネクタから再生することも、Elasticsearch の再インデックス操作を使用して取得することもできます。

Kibana でデータを視覚化する

潮汐データを視覚化するには、まず Kibana でインデックス パターンを作成し、時間フィルター フィールドとして「t」を設定します。次に、折れ線グラフの種類を選択して視覚化を作成します。最後に、グラフ設定を構成して、y 軸に 30 分間の平均潮位が表示され、x 軸にこのデータが時間の経過とともに表示されるようにします。

結果は、パイプラインがデータを収集する 5 つのサンプル潮位観測所における潮汐の変化を示す次のグラフになります。

図10

結果

視覚化により、毎月 2 回の満潮が発生するという潮の周期的な性質がはっきりとわかります。

図11

さらに驚くべきことは、満潮と干潮の間隔が世界中のすべての潮位観測所で同じではないということです。これは月だけでなく、太陽、地元の地理、天気、気候の変化によっても影響を受けます。この Kafka Connect パイプラインの例では、Kafka、Elasticsearch、Kibana を活用して視覚化の威力を実証しています。視覚化により、生データでは明らかにできない情報が明らかになることがよくあります。

元のタイトル: Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法、著者: Paul Brebner

[51CTOによる翻訳。パートナーサイトに転載する場合は、元の翻訳者と出典を51CTO.comとして明記してください。

<<:  分散調整フレームワークZookeeperのコア設計の理解と実践

>>:  私の国の通信事業者はクラウドコンピューティングの導入において3つの大きな課題に直面しています

推薦する

初心者ウェブマスターのためのリンクベイトの作り方

リンク ベイト、ウェブマスターはみんなリンク ベイトについて聞いたことがあるでしょうが、どうやって作...

hostkvm: 香港 cn2 高速 VPS、20% 割引、月額 7.6 ドルから、2G メモリ/1 コア/40gSSD/120G トラフィック

Hostkvm は現在、香港葵湾データセンターで香港 CN2 VPS を推進しています。この VPS...

myrsk-$6/KVM/3IP/1G メモリ/70G ハードディスク/1T トラフィック/フェニックス シティ

myrskはHostcatに何度も登場しています。同社は2009年に設立されました。以前は通常のVP...

電子商取引のトピックに関する議論: お金を稼ぐ人はなぜ皆、Taobao でお金を稼ぐのでしょうか?

「今は電子商取引の冬だ」と言われているが、タオバオ、特に伝統的なブランド企業はTmallでかなり良い...

王通:誰かが悪意を持ってオンラインで攻撃してきたらどうしますか?

ショートビデオ、セルフメディア、インフルエンサーのためのワンストップサービス先月、**クラスの生徒2...

2021年のクラウドネイティブトレンドの予測

[編集者注] この記事の著者は、クラウド ネイティブ エンジニアとしての利点を活かして、クラウド ネ...

クラウド コンピューティング チップ戦争: 小さな暗雲か、それとも大きな嵐か?

1900 年、尊敬を集めるケルビン卿がロンドンのアルベマール ストリートにある王立研究所で講演を行い...

ブランドマーケティングを活用する方法を本当にご存知ですか?

多くのブランドは、マーケティングカレンダーを参考にして適切なノードを選択し、年間マーケティング計画を...

記事は含まれているがランキングされていない問題を解決する方法についての簡単な分析

先ほど、誤ってA5に行って記事を読んでしまいました。記事は含まれているのにランク付けされていないとい...

Alibaba Cloud エコシステムがインテリジェントな「ナビゲーションの時代」を切り開く

業界が成長すると、Alibaba Cloud はエコシステムを構築します。業界がエコシステムを構築し...

Google+1 を介した最適化における不正行為の害についての簡単な説明

Google+1 は検索としては比較的新しいものです。しかし、ある意味では検索に大きな影響を与え始め...

Cloudsilkの香港VPSレビュー、3つのネットワークに中国移動のCMI回線をバックホールに使用させる

一昨日cloudsilkが立ち上げた香港のVPSはモバイルCMIとBGPネットワークに接続されていま...

新浪はポルノ関連の犯罪で重い罰金を科される可能性があり、NBAのライブ放送事業の将来は不透明

劉佳新浪はわいせつな情報やポルノ情報を流布したとして、81万5000ドルの罰金を科せられ、一部のライ...

Pinduoduoは新たな戦場を見つけたのか?

11月中旬、中国の3大電子商取引企業、アリババ、JD.com、ピンドゥオドゥオが相次いで第3四半期の...