Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

[[413839]]

[51CTO.com クイック翻訳] Kafka Connect は、特に強力なオープンソースのデータ ストリーミング ツールです。これにより、Kafka を他のデータ テクノロジーと簡単に組み合わせることができます。分散テクノロジーである Kafka Connect は、Kafka クラスターから独立した、非常に高い可用性と弾力的なスケーリングを提供します。 Kafka Connect は、ソース コネクタまたはシンク コネクタを使用して Kafka トピックとの間でデータを送受信し、さまざまな非 Kafka テクノロジーとのコード不要の統合を可能にします。

図1

多くの一般的なデータ テクノロジーに対応した堅牢なオープン ソース Kafka コネクタが用意されており、独自のコネクタを作成することもできます。この記事では、Kafka Connect を使用して、Kafka からのリアルタイム ストリーミング データを Elasticsearch (インデックス付けされた Kafka レコードのスケーラブルな検索を可能にする) および Kibana (その結果を視覚化する) と統合する実際のデータの使用例について説明します。

図2

Kafka と Kafka Connect の利点を示すユースケースとして、CDC COVID-19 データ トラッカーからヒントを得ました。 Kafka ベースのトラッカーは、複数の場所、複数の形式、複数のプロトコルからリアルタイムの COVID-19 検査データを収集し、これらのイベントを使いやすい視覚化に処理します。トラッカーには、結果が迅速に届き、信頼できるものであることを保証するために必要なデータ ガバナンス メカニズムも導入されています。

私は、同様に複雑で説得力のある、しかし理想的にはコロナウイルスのパンデミックほど心配の少ないユースケースを探し始めました。最終的に、私は興味深いドメインを見つけました。Moontide には、公開されているストリーミング REST API と、シンプルな JSON 形式の豊富なデータが含まれています。

月の潮汐データ

潮汐は月の一日に従います。月の一日は 24 時間 50 分の周期で、その間に地球は軌道を周回する衛星の下の同じ地点まで完全に自転します。月の重力により、毎月 2 回、満潮と干潮が起こります。

図3. アメリカ海洋大気庁より

アメリカ海洋大気庁 (NOAA) は、世界中の潮位観測所から詳細なセンサー データを簡単に取得できる REST API を提供しています。

図4

たとえば、次の REST 呼び出しでは、潮位観測所 ID、データ タイプ (海面を選択)、およびデータ (平均海面) を指定し、最新の結果をメートル単位で要求します。

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

この呼び出しは、潮位観測所の緯度と経度、時間、水位値を含む JSON 結果を返します。返される結果のデータ型、量、単位がわかるように、何を呼び出したかを覚えておく必要があることに注意してください。

  1. { "メタデータ" : {
  2. "id" : "8724580"
  3. 「名前」 : 「キーウェスト」
  4. "緯度" :"24.5508",
  5. "経度" : "-81.8081" },
  6. "データ" :[{
  7. "t" : "2020-09-24 04:18" ,
  8. "v" : "0.597" ,
  9. "s" : "0.005" "f" : "1,0,0,0" "q" : "p" }]}

データ パイプラインを開始する (REST ソース コネクタを使用)

Kafka Connect ストリーミング データ パイプラインの作成を開始するには、まず Kafka クラスターと Kafka Connect クラスターを準備する必要があります。

図5

次に、オープンソースで入手可能な REST コネクタをインポートします。これを AWS S3 バケットにデプロイします (必要に応じて、これらの手順に従ってください)。次に、Kafka Connect クラスターに S3 バケットを使用するように要求し、クラスター内で表示されるように同期し、コネクタを構成して、最後に実行します。この「BYOC」(Bring Your Own Connector)アプローチにより、特定の要件を満たすコネクタを見つける方法が無数に確保されます。

図6

次の例は、「curl」コマンドを使用して REST API を使用するように完全にオープン ソースの Kafka Connect デプロイメントを構成する方法を示しています。独自の展開に合わせて URL、名前、パスワードを変更する必要があることに注意してください。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「source_rest_tide_1」
  4. 「設定」 :{
  5. "key.converter" : "org.apache.kafka.connect.storage.StringConverter"
  6. "value.converter" : "org.apache.kafka.connect.storage.StringConverter"
  7. "コネクタ.クラス" : "com.tm.kafka.connect.rest.RestSourceConnector"
  8. "タスク.max" : "1" ,
  9. "rest.source.poll.interval.ms" : "600000" ,
  10. "rest.source.method" : "GET"
  11. "rest.source.url" : "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json"
  12. "rest.source.headers" : "コンテンツタイプ:application/json、Accept:application/json"
  13. "rest.source.topic.selector" : "com.tm.kafka.connect.rest.selector.SimpleTopicSelector"
  14. "rest.source.destination.topics" : "潮汐トピック"  
  15. }
  16. }

このコードによって作成されたコネクタ タスクは、10 分間隔で REST API をポーリングし、その結果を「tides-topic」Kafka トピックに書き込みます。このように 5 つの潮汐センサーをランダムに選択してデータを収集すると、5 つの構成と 5 つの接続を通じて潮汐データが潮汐テーマに入力されるようになります。

図7

パイプラインを終了する(Elasticsearch シンクコネクタを使用)

この Tidal データをどこかに保存するには、パイプラインの最後に Elasticsearch クラスターと Kibana を導入します。 Elasticsearch にデータを送信するために、オープンソースの Elasticsearch シンク コネクタを構成します。

図8

次の構成例では、シンク名、クラス、Elasticsearch インデックス、および Kafka トピックを使用します。インデックスがまだ存在しない場合は、デフォルトのマッピングを使用してインデックスが作成されます。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「弾性沈下潮」
  4. 「設定」 :
  5. {
  6. "コネクタ.クラス" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector"
  7. "タスク.max" : 3,
  8. 「トピック」 「潮汐」
  9. "connect.elastic.hosts" : "ip",
  10. "connect.elastic.port" : 9201,
  11. "connect.elastic.kcql" : "tides-indexに挿入し、tides-topicから*を選択します"
  12. "connect.elastic.use.http.username" : "elasticName",
  13. "connect.elastic.use.http.password" : "elasticPassword"
  14. }
  15. }'

パイプラインは現在稼働中です。ただし、デフォルトのインデックス マッピングにより、Tides インデックスに入力されるすべての潮汐データは文字列になります。

図9

時系列データを正確にプロットするには、カスタム マッピングが必要です。以下の Tidal インデックスのカスタム マッピングを作成します。カスタム日付には JSON の「t」フィールド、倍精度数値には「v」、集計を表すキーワードには「name」を使用します。

  1. curl -u elasticName:elasticPassword "elasticURL:9201/tides- index " -X PUT -H 'Content-Type: application/json' -d'
  2. {
  3. 「マッピング」 : {
  4. 「プロパティ」 : {
  5. "データ" : {
  6. 「プロパティ」 : {
  7. "t" : { "type" : "date" ,
  8. 「フォーマット」 : 「yyyy-MM-dd HH:mm」  
  9. },
  10. "v" : { "type" : "double" },
  11. "f" : { "type" : "text" },
  12. "q" : { "type" : "text" },
  13. "s" : { "タイプ" : "テキスト" }
  14. }
  15. },
  16. 「メタデータ」 : {
  17. 「プロパティ」 : {
  18. 「id」 : { 「タイプ」 : 「テキスト」 },
  19. "lat" : { "type" : "text" },
  20. "long" : { "type" : "text" },
  21. 「名前」 : { 「タイプ」 : 「キーワード」 } }}}} }'

Elasticsearch インデックス マッピングを変更するたびに、通常は Elasticsearch の「再インデックス」 (インデックスを削除し、すべてのデータの再インデックス) を行う必要があります。データは、このユースケースのように既存の Kafka シンク コネクタから再生することも、Elasticsearch の再インデックス操作を使用して取得することもできます。

Kibana でデータを視覚化する

潮汐データを視覚化するには、まず Kibana でインデックス パターンを作成し、時間フィルター フィールドとして「t」を設定します。次に、折れ線グラフの種類を選択して視覚化を作成します。最後に、グラフ設定を構成して、y 軸に 30 分間の平均潮位が表示され、x 軸にこのデータが時間の経過とともに表示されるようにします。

結果は、パイプラインがデータを収集する 5 つのサンプル潮位観測所における潮汐の変化を示す次のグラフになります。

図10

結果

視覚化により、毎月 2 回の満潮が発生するという潮の周期的な性質がはっきりとわかります。

図11

さらに驚くべきことは、満潮と干潮の間隔が世界中のすべての潮位観測所で同じではないということです。これは月だけでなく、太陽、地元の地理、天気、気候の変化によっても影響を受けます。この Kafka Connect パイプラインの例では、Kafka、Elasticsearch、Kibana を活用して視覚化の威力を実証しています。視覚化により、生データでは明らかにできない情報が明らかになることがよくあります。

元のタイトル: Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法、著者: Paul Brebner

[51CTOによる翻訳。パートナーサイトに転載する場合は、元の翻訳者と出典を51CTO.comとして明記してください。

<<:  分散調整フレームワークZookeeperのコア設計の理解と実践

>>:  私の国の通信事業者はクラウドコンピューティングの導入において3つの大きな課題に直面しています

推薦する

伝統的な大物たちはインターネットで飾り付けをしたいので、まず頭の後ろの三つ編みを切る

『易経』には「君子は豹のように変わり、悪人は顔つきを変える」という格言がある。翻訳すると、状況が変わ...

統合ネットワークマーケティングに関する簡単な説明

メディア多様化の時代では、伝統的なメディアと新しいメディアが視聴者をめぐって競争し、視聴者自身のニー...

回答: レンガ職人になるのはいかがですか?レンガ職人のよくある問題、ぜひここで質問してください!

BandwagonHost VPS は中国で非常に人気があり、多くの初心者が BandwagonHo...

中国と米国間の最速のネットワーク回線である、信頼できるcn2 gia vpsマーチャントをいくつか推奨します

ウェブマスターは、cn2 gia ネットワークを提供する複数の VPS 販売業者を慎重に選択して推奨...

mivocloud - 高セキュリティ無制限トラフィック VPS/5 ユーロ/2g メモリ/40g SSD/openstack

安価で強力なヨーロッパの VPS が必要な場合は、mivocloud を試してみてください。データ ...

ウェブマスターの皆さん、ウェブサイトをいじって遊んでいますか、それとも真剣に構築するつもりですか?

ウェブサイトの構築は、お金を稼ぐためでも、趣味のためでも構いません。前者はいわゆる「ウェブサイトを作...

簡単な説明: Baidu 入札アカウントの開設プロセスとプロモーションスキル

ご存知のとおり、SEO 最適化とは、キーワードを継続的に検討して最適化し、企業の Web サイトが検...

kvmla: 香港将軍澳データセンター VPS が 20% オフ、追加 1G メモリと Windows サポート付き

kvmla は香港の将軍澳に新しいデータセンターを追加し、現在 VPS を販売しています。新しいキャ...

英国2〜5ポンド/Xen/1Gメモリ/25GSSD/無制限Gポート/英国

UK2 グループの同名ブランド UK2.NET では、クリスマス プロモーションを実施しており、仮想...

SEOにタグを使用するタイミング

SEO 担当者は、最適化やスパム対策にタグを使用してきた長い歴史があります。しかし、タグを使用して ...

地元のサイトを守る4つの盾

ローカル Web サイトの構築は、常に多くの幸運を伴う問題です。いくつかのローカル Web サイトは...

ニュースサイトの未来:地域社会の収益見通し

ニュースサイトを企業に変えるパイロットプログラムは2年間続いており、行政機関から企業へと、アイデンテ...

政府クラウドを効果的に管理する3つのメリット

米国の一部の州政府や地方自治体で、クラウド コンピューティング環境が混乱状態にあることは珍しくありま...

netcloud-10 USD/1G RAM/24G HDD/1T トラフィック/KVM/onapp クラウド

ご存知のとおり、現在主流の「駆動クラウド」プラットフォームは onapp と openstsck で...

Dockerをオフラインで素早くインストールする方法

Docker はオープンソースのアプリケーション コンテナー エンジンであり、開発者はアプリケーショ...