Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法は?

[[413839]]

[51CTO.com クイック翻訳] Kafka Connect は、特に強力なオープンソースのデータ ストリーミング ツールです。これにより、Kafka を他のデータ テクノロジーと簡単に組み合わせることができます。分散テクノロジーである Kafka Connect は、Kafka クラスターから独立した、非常に高い可用性と弾力的なスケーリングを提供します。 Kafka Connect は、ソース コネクタまたはシンク コネクタを使用して Kafka トピックとの間でデータを送受信し、さまざまな非 Kafka テクノロジーとのコード不要の統合を可能にします。

図1

多くの一般的なデータ テクノロジーに対応した堅牢なオープン ソース Kafka コネクタが用意されており、独自のコネクタを作成することもできます。この記事では、Kafka Connect を使用して、Kafka からのリアルタイム ストリーミング データを Elasticsearch (インデックス付けされた Kafka レコードのスケーラブルな検索を可能にする) および Kibana (その結果を視覚化する) と統合する実際のデータの使用例について説明します。

図2

Kafka と Kafka Connect の利点を示すユースケースとして、CDC COVID-19 データ トラッカーからヒントを得ました。 Kafka ベースのトラッカーは、複数の場所、複数の形式、複数のプロトコルからリアルタイムの COVID-19 検査データを収集し、これらのイベントを使いやすい視覚化に処理します。トラッカーには、結果が迅速に届き、信頼できるものであることを保証するために必要なデータ ガバナンス メカニズムも導入されています。

私は、同様に複雑で説得力のある、しかし理想的にはコロナウイルスのパンデミックほど心配の少ないユースケースを探し始めました。最終的に、私は興味深いドメインを見つけました。Moontide には、公開されているストリーミング REST API と、シンプルな JSON 形式の豊富なデータが含まれています。

月の潮汐データ

潮汐は月の一日に従います。月の一日は 24 時間 50 分の周期で、その間に地球は軌道を周回する衛星の下の同じ地点まで完全に自転します。月の重力により、毎月 2 回、満潮と干潮が起こります。

図3. アメリカ海洋大気庁より

アメリカ海洋大気庁 (NOAA) は、世界中の潮位観測所から詳細なセンサー データを簡単に取得できる REST API を提供しています。

図4

たとえば、次の REST 呼び出しでは、潮位観測所 ID、データ タイプ (海面を選択)、およびデータ (平均海面) を指定し、最新の結果をメートル単位で要求します。

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

この呼び出しは、潮位観測所の緯度と経度、時間、水位値を含む JSON 結果を返します。返される結果のデータ型、量、単位がわかるように、何を呼び出したかを覚えておく必要があることに注意してください。

  1. { "メタデータ" : {
  2. "id" : "8724580"
  3. 「名前」 : 「キーウェスト」
  4. "緯度" :"24.5508",
  5. "経度" : "-81.8081" },
  6. "データ" :[{
  7. "t" : "2020-09-24 04:18" ,
  8. "v" : "0.597" ,
  9. "s" : "0.005" "f" : "1,0,0,0" "q" : "p" }]}

データ パイプラインを開始する (REST ソース コネクタを使用)

Kafka Connect ストリーミング データ パイプラインの作成を開始するには、まず Kafka クラスターと Kafka Connect クラスターを準備する必要があります。

図5

次に、オープンソースで入手可能な REST コネクタをインポートします。これを AWS S3 バケットにデプロイします (必要に応じて、これらの手順に従ってください)。次に、Kafka Connect クラスターに S3 バケットを使用するように要求し、クラスター内で表示されるように同期し、コネクタを構成して、最後に実行します。この「BYOC」(Bring Your Own Connector)アプローチにより、特定の要件を満たすコネクタを見つける方法が無数に確保されます。

図6

次の例は、「curl」コマンドを使用して REST API を使用するように完全にオープン ソースの Kafka Connect デプロイメントを構成する方法を示しています。独自の展開に合わせて URL、名前、パスワードを変更する必要があることに注意してください。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「source_rest_tide_1」
  4. 「設定」 :{
  5. "key.converter" : "org.apache.kafka.connect.storage.StringConverter"
  6. "value.converter" : "org.apache.kafka.connect.storage.StringConverter"
  7. "コネクタ.クラス" : "com.tm.kafka.connect.rest.RestSourceConnector"
  8. "タスク.max" : "1" ,
  9. "rest.source.poll.interval.ms" : "600000" ,
  10. "rest.source.method" : "GET"
  11. "rest.source.url" : "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json"
  12. "rest.source.headers" : "コンテンツタイプ:application/json、Accept:application/json"
  13. "rest.source.topic.selector" : "com.tm.kafka.connect.rest.selector.SimpleTopicSelector"
  14. "rest.source.destination.topics" : "潮汐トピック"  
  15. }
  16. }

このコードによって作成されたコネクタ タスクは、10 分間隔で REST API をポーリングし、その結果を「tides-topic」Kafka トピックに書き込みます。このように 5 つの潮汐センサーをランダムに選択してデータを収集すると、5 つの構成と 5 つの接続を通じて潮汐データが潮汐テーマに入力されるようになります。

図7

パイプラインを終了する(Elasticsearch シンクコネクタを使用)

この Tidal データをどこかに保存するには、パイプラインの最後に Elasticsearch クラスターと Kibana を導入します。 Elasticsearch にデータを送信するために、オープンソースの Elasticsearch シンク コネクタを構成します。

図8

次の構成例では、シンク名、クラス、Elasticsearch インデックス、および Kafka トピックを使用します。インデックスがまだ存在しない場合は、デフォルトのマッピングを使用してインデックスが作成されます。

  1. curl https://connectorClusterIP:8083/connectors -k -u名前:パスワード-X POST -H 'Content-Type: application/json' -d '
  2. {
  3. 「名前」 : 「弾性沈下潮」
  4. 「設定」 :
  5. {
  6. "コネクタ.クラス" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector"
  7. "タスク.max" : 3,
  8. 「トピック」 「潮汐」
  9. "connect.elastic.hosts" : "ip",
  10. "connect.elastic.port" : 9201,
  11. "connect.elastic.kcql" : "tides-indexに挿入し、tides-topicから*を選択します"
  12. "connect.elastic.use.http.username" : "elasticName",
  13. "connect.elastic.use.http.password" : "elasticPassword"
  14. }
  15. }'

パイプラインは現在稼働中です。ただし、デフォルトのインデックス マッピングにより、Tides インデックスに入力されるすべての潮汐データは文字列になります。

図9

時系列データを正確にプロットするには、カスタム マッピングが必要です。以下の Tidal インデックスのカスタム マッピングを作成します。カスタム日付には JSON の「t」フィールド、倍精度数値には「v」、集計を表すキーワードには「name」を使用します。

  1. curl -u elasticName:elasticPassword "elasticURL:9201/tides- index " -X PUT -H 'Content-Type: application/json' -d'
  2. {
  3. 「マッピング」 : {
  4. 「プロパティ」 : {
  5. "データ" : {
  6. 「プロパティ」 : {
  7. "t" : { "type" : "date" ,
  8. 「フォーマット」 : 「yyyy-MM-dd HH:mm」  
  9. },
  10. "v" : { "type" : "double" },
  11. "f" : { "type" : "text" },
  12. "q" : { "type" : "text" },
  13. "s" : { "タイプ" : "テキスト" }
  14. }
  15. },
  16. 「メタデータ」 : {
  17. 「プロパティ」 : {
  18. 「id」 : { 「タイプ」 : 「テキスト」 },
  19. "lat" : { "type" : "text" },
  20. "long" : { "type" : "text" },
  21. 「名前」 : { 「タイプ」 : 「キーワード」 } }}}} }'

Elasticsearch インデックス マッピングを変更するたびに、通常は Elasticsearch の「再インデックス」 (インデックスを削除し、すべてのデータの再インデックス) を行う必要があります。データは、このユースケースのように既存の Kafka シンク コネクタから再生することも、Elasticsearch の再インデックス操作を使用して取得することもできます。

Kibana でデータを視覚化する

潮汐データを視覚化するには、まず Kibana でインデックス パターンを作成し、時間フィルター フィールドとして「t」を設定します。次に、折れ線グラフの種類を選択して視覚化を作成します。最後に、グラフ設定を構成して、y 軸に 30 分間の平均潮位が表示され、x 軸にこのデータが時間の経過とともに表示されるようにします。

結果は、パイプラインがデータを収集する 5 つのサンプル潮位観測所における潮汐の変化を示す次のグラフになります。

図10

結果

視覚化により、毎月 2 回の満潮が発生するという潮の周期的な性質がはっきりとわかります。

図11

さらに驚くべきことは、満潮と干潮の間隔が世界中のすべての潮位観測所で同じではないということです。これは月だけでなく、太陽、地元の地理、天気、気候の変化によっても影響を受けます。この Kafka Connect パイプラインの例では、Kafka、Elasticsearch、Kibana を活用して視覚化の威力を実証しています。視覚化により、生データでは明らかにできない情報が明らかになることがよくあります。

元のタイトル: Kafka Connect を使用してリアルタイム データを処理するためのオープン ソース データ パイプラインを作成する方法、著者: Paul Brebner

[51CTOによる翻訳。パートナーサイトに転載する場合は、元の翻訳者と出典を51CTO.comとして明記してください。

<<:  分散調整フレームワークZookeeperのコア設計の理解と実践

>>:  私の国の通信事業者はクラウドコンピューティングの導入において3つの大きな課題に直面しています

推薦する

FinOpsクラウドコスト最適化は無視できない

調査会社ガートナーの調査によると、2022年末までに世界中の企業がクラウドコンピューティングインフラ...

HarmonyOS サンプル DistributedMusicPlayer 分散音楽プレーヤー

[[419218]]詳細については、以下をご覧ください。 51CTOとHuaweiが共同で構築したH...

locvpsの日本のVPSはどうですか? locvpsの実態を簡易評価する日本のvps

locvps の日本の VPS は、さまざまなニーズを持つ顧客に対応するために、小帯域幅無制限トラフ...

3つの主要なポイントがBaiduの競合他社をはるかに上回る

何を達成したいかに関係なく、まずは相手が何をどのようにやっているか、そして相手に追いつき追い越すため...

#サイト グループ サーバー#-tcloud-E3-1230/16g メモリ/1T ハード ディスク/100M 帯域幅/258IPv4

tcloud、英国での登録社名はTcloud Limited、中国での登録社名はNingbo Zho...

ウェブサイトのデザイン分析: 特別なウェブページのデザインについてどう思いますか?

トピックを理解する - 特定のテーマに関するトピックなので、導入部は不可欠です。このような導入部はす...

医療ウェブサイトのマーケティング状況に関する簡単な議論

医療ウェブサイトをコンバージョンさせる鍵は病院にたどり着くことです。そのためには、1.国内医療ステー...

ウェブサイトのキーワードポジショニングの重要性

多くの友人から「キーワードの位置づけはどうなっているの?」と聞かれます。このような問題はたくさんあり...

digitaloceanは5ドルと1か月の無料VPSを提供します

Digital Ocean から、ニューヨークに 2 番目のデータセンターを開設したというメールを受...

検索エンジンを通じてブランドマーケティングを行う方法

検索エンジンマーケティングは、その名前が示すように、インターネットユーザーに検索を促し、それが今日の...

当時の「ユーザー」をめぐる私たちの関係

SEO 3.0 時代では、すべてのウェブマスターがユーザー エクスペリエンスの真の意味を理解している...

友好的なリンク交換の3つの要素:ウェブサイトをスムーズに運営する

友好的なリンクは、ウェブサイト全体の重みとキーワードのランキングを向上させる上で重要な役割を果たしま...

ホストを選ぶには秘訣があります。特に2つのことをうまく行うことが重要です。

ホストの選択はウェブサイトにとって大きな問題です。一般的なウェブサイトの場合、ホストの選択はウェブサ...

どのようなウェブサイトを更新する必要があるのか​​、またどのようなウェブサイトを更新する必要がないのか

コンテンツと外部リンクは、ウェブサイトを最適化するプロセスにおいて SEO 担当者が最もよく使用する...

ウェブマスターネットワークからの毎日のレポート:Qunarの資金調達とAlipay情報の抜け穴が論争を巻き起こす

1. セルフメディアは収益モデルを見つけるのに苦労している:持続可能な開発が重要有名なセルフメディア...